You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2017/12/17 06:52:20 UTC

A question about Triggers

I want to augment a POJO in  Trigger's onElement method, specifically
supply the POJO with the watermark from the TriggerContext. The sequence of
execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO
reference in the Accumulator.
2. call to onElement on Tigger
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done
in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have
access to the mutation by the onElement() in the POJO in the subsequent
add(),  but not on a distributed cluster. The specific question I had is
whether  add() on a supplied accumulator on a window and onElement() method
of the trigger on that window are inline executions, on the same thread or
is there any serialization/deserialization IPC that causes these divergence
( local versus distributed )

Regards.

Re: A question about Triggers

Posted by Fabian Hueske <fh...@gmail.com>.
I think I got it
Glad you solved this tricky issue and thanks for sharing your solution :-)

Best, Fabian


2018-01-06 14:33 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> Yep, this though is suboptimal as you imagined.   Two things
>
> * <IN> has a internally has a <INLite> that is a ultra lite version of IN,
> only required for the path analysis.
> * Sessionization being expensive, we piggy back multiple other
> aggregations that do not depend on the path or order ( count etc ) .
> Essentially Session is (order path + accumulated stats).
>
> The code seems pretty all right and please tell me if you need a see it.
> All generics so no secrets here.
>
>
>
>
>
>
>
>
> On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> you would not need the ListStateDescriptor. A WindowProcessFunction
>> stores all events that are assigned to a window (IN objects in your case)
>> in an internal ListState.
>> The Iterable<IN> parameter of the process() method iterates over the
>> internal list state.
>>
>> So you would have a Trigger that fires when a new watermark is received
>> (or in regular intervals like every minute) and at the end of the window.
>> The process() method looks up the current watermark in the Context
>> object, traverses the Iterable<IN> filtering out all events with timestamp
>> > watermark (you would need to enrich the events with the timestamp which
>> can be done in a ProcessFunction), inserting the remaining ones into a
>> sorted data structure (possibly leveraging the almost sorted nature of the
>> events) and create a Session from it.
>>
>> This is probably less efficient than your ProcessFunction because
>> process() would go over the complete list over and over again and not be
>> able to persist the result of previous invocations.
>> However, the code should be easier to maintain.
>>
>> Does that make sense?
>>
>> Best, Fabian
>>
>> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> Hello Fabian, Thank you for your response.
>>>
>>>                      I thought about it and may be am missing something
>>> obvious here. The code below is what I think you suggest. The issue is that
>>> the window now is a list of Session's ( or shall subsets of the Session).
>>>
>>> What is required is that on a new watermark
>>>
>>> * We sort these Session objects
>>> * Get the subset that are before the new Watermark and an emit without
>>> purge.
>>>
>>> I do not see how the Trigger approach helps us. It does tell us that the
>>> watermark has progressed but to get a subset of the ListState that falls
>>> before the watermark, we would need access to *the new value  of the
>>> watermark*. That was what my initial query was.
>>>
>>>
>>>
>>> public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {
>>>
>>>
>>>     OUT toCreateNew;
>>>     Long gap;
>>>     private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>>>
>>>     public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
>>>                                 OUT toCreateNew) {
>>>         this.toCreateNew = toCreateNew;
>>>         mergingSetsStateDescriptor =
>>>                 new ListStateDescriptor<>("sessions", aggregationResultType);
>>>     }
>>>     @Override
>>>     public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception {
>>>         OUT session = toCreateNew.createNew();
>>>         elements.forEach(f -> session.add(f));
>>>         context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>>>     }
>>> }
>>>
>>>
>>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> thanks for sharing your solution!
>>>>
>>>> Looking at this issue again and your mail in which you shared your
>>>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
>>>> ValueState that prevents the ProcessWindowFunction to be used in a
>>>> mergeable window.
>>>> You could have created a new Session object in each invocation of the
>>>> ProcessWindowFucntion and simply keep the elements in the (mergable) list
>>>> state of the window.
>>>> In that case you would simply need a custom trigger that calls the
>>>> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
>>>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
>>>> elements from the window's state.
>>>> Did you try that?
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>
>>>>> Dear Fabian,
>>>>>
>>>>>            I was able to create a pretty functional ProcessFunction
>>>>> and here is the synopsis and please see if it makes sense.
>>>>>
>>>>> Sessionization is unique as in it entails windows of dynamic length.
>>>>> The way flink approaches is pretty simple. It will create a TimeWindow of
>>>>> size "gap" relative to the event time, find an overlapping window (
>>>>> intersection ) and create a covering window. Each such window has a "state"
>>>>> associated with it, which too has to be merged when a cover window is
>>>>> created on intersection of 2 or more incident windows.To be more
>>>>> precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3,
>>>>> t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the
>>>>> associated states are merged.
>>>>>
>>>>>
>>>>> In the current Window API the states are external and are
>>>>> Accumulator based. This approach pretty much works for all cases
>>>>> where the aggregation is accumulative/reduced  and does not depend on
>>>>> order, as in no order list of incoming records needs to be kept and
>>>>> reduction is to a single aggregated element ( think counts, min max etc).
>>>>> In path analysis ( and other use cases ) however this approach has
>>>>> drawbacks. Even though in our accumulator we could keep an ordered list of
>>>>> events it becomes unreasonable if not within bounds. An approach that does
>>>>> *attempt* to bind state, is to preemptively analyze paths using the
>>>>> WM as the marker that defines the *subset* of the state that is safe
>>>>> to analyze. So if we have n events in the window state and m fall before
>>>>> WM, we can safely analyze the m subset, emitting paths seen and reducing
>>>>> the cumulative state size. There are caveats though that I will go into
>>>>> later.
>>>>>
>>>>>
>>>>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>>>>> have access to the WM.
>>>>>
>>>>>
>>>>> This lead to this generic approach  ( implemented and tested )
>>>>>
>>>>>
>>>>> * Use a low level ProcessFunction that allows access to WM and
>>>>> definitely nearer to the guts of Flink.
>>>>>
>>>>>
>>>>> * Still use the merge Windows on intersection approach but use WM to
>>>>> trigger ( through Timers)  reductions in state. This is not very
>>>>> dissimilar to what Flink does but we have more control over what to do and
>>>>> when to do it. Essentially have exposed a lifecycle method that
>>>>> reacts to WM progression.
>>>>>
>>>>>
>>>>> * There are essentially 2 Timers. The first timer is the
>>>>> maxTimeStamp() of a Window, which if there is no further mutation b'coz of
>>>>> merge etc will fire to reflect a Session End. The second one is  on
>>>>> currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed
>>>>> Window and thus State.
>>>>>
>>>>>
>>>>> * There are 2 ways to short circuit a Session 1. On Session time span
>>>>> 2. On Session size.
>>>>>
>>>>>
>>>>> * There is a safety valve to blacklist keys when it is obvious that it
>>>>> is a bot ( again
>>>>>
>>>>>
>>>>> The solution will thus preemptively push out Patterns ( and correct
>>>>> patterns ) while keeping the ordered state within reasonable bounds. The
>>>>> incident data of course has to be analyzed . Are the paths to large etc.
>>>>> But one has full control over how to fashion the solution.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards and Thanks,
>>>>>
>>>>>
>>>>> Vishal
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> This makes sense.  Thanks.
>>>>>>
>>>>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>>>>>> Think of a single thread calling these methods.
>>>>>>> Event-time timers are called when a watermark passes the timer.
>>>>>>> Watermarks are received as special records, so the methods are called in
>>>>>>> the same order as records (actual records or watermarks) arrive at the
>>>>>>> function. Only for processing-time timers, actual synchronization is
>>>>>>> required.
>>>>>>>
>>>>>>> The NPE might be thrown because of two timers that fire one after
>>>>>>> the other without a new record being processed in between the onTimer()
>>>>>>> calls. In that case the state is cleared in the first call and null in the
>>>>>>> second.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>>>>>> think that the core should take care of any synchronization issues between
>>>>>>>> calls to onElement and onTimer in case of a keyed stream but tests do not
>>>>>>>> seem to suggest that.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I have  specifically 2 questions.
>>>>>>>>
>>>>>>>>
>>>>>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key
>>>>>>>> ? As in on a keyed stream, is there a  way that 2 or more threads
>>>>>>>> can execute on the more than one element of a single key at one time ?
>>>>>>>> Would I have to synchronize this construction
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for
>>>>>>>> the same key ? I intend to clean up state but I see  NullPointers
>>>>>>>> in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer
>>>>>>>> are executed on 2  separate threads, with on Timer removing the
>>>>>>>> state ( clear() ) but after another thread has registered a Timer ( in
>>>>>>>> onElement ).
>>>>>>>>
>>>>>>>>
>>>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>>>>         accumulatorState.clear();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> PS. This is the full code.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>>>>>>>>     TimerService timerService = context.timerService();
>>>>>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>>>>>         OUT accumulator = accumulatorState.value();
>>>>>>>>         if (accumulator == null) {
>>>>>>>>             accumulator = acc.createNew();
>>>>>>>>         }
>>>>>>>>         accumulator.setLastModified(context.timestamp());
>>>>>>>>         accumulatorState.update(accumulator);
>>>>>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>>>>>>>>     OUT accumulator = accumulatorState.value();
>>>>>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>>>>         accumulatorState.clear();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> That's correct. Removal of timers is not supported in
>>>>>>>>> ProcessFunction. Not sure why this is supported for Triggers.
>>>>>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>>>>>> timers and have a ValueState that stores the valid timestamp on which the
>>>>>>>>> onTimer method should be executed.
>>>>>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>>>>>> whether the timestamp is the correct one and leaves the method if that is
>>>>>>>>> not the case.
>>>>>>>>> If you want to fire on the next watermark, another trick is to
>>>>>>>>> register multiple timers on (currentWatermark + 1). Since there is only one
>>>>>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>>>>>
>>>>>>>>> On the performance of the timer service. AFAIK, all methods that
>>>>>>>>> work with some kind of timer use this service. So there is not much choice.
>>>>>>>>>
>>>>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>
>>>>>>>>>> And that further begs the question.. how performant is Timer
>>>>>>>>>> Service. I tried to peruse through the architecture behind it but cold not
>>>>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many threads
>>>>>>>>>> etc...
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>
>>>>>>>>>>>> it is not guaranteed that add() and onElement() receive the
>>>>>>>>>>>> same object, and even if they do it is not guaranteed that a mutation of
>>>>>>>>>>>> the object in onElement() has an effect. The object might have been
>>>>>>>>>>>> serialized and stored in RocksDB.
>>>>>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>>>>>
>>>>>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>>>>>> because there is no interaction of window assigner, trigger, and window
>>>>>>>>>>>> function.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>>>>>> Operator.java#L362
>>>>>>>>>>>>>
>>>>>>>>>>>>> is where We could fashion as to what is emitted. Again for us
>>>>>>>>>>>>> it seems natural to use WM to materialize a micro batches with
>>>>>>>>>>>>> "approximate" order ( and no I am not a fan of spark micro batches :)). Any
>>>>>>>>>>>>> pointers as to how we could write an implementation that allows for "up
>>>>>>>>>>>>> till WM emission" through a trigger on a Session Window would be very
>>>>>>>>>>>>> helpful. In essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Something like https://github.com/apache
>>>>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>>>>>
>>>>>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>>>>>> Again the motive is the keep the state lean as we desire to search for
>>>>>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and thus of
>>>>>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in this
>>>>>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call after telling
>>>>>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>>>>         session.clear();
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>>>>>>>>> windows.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that
>>>>>>>>>>>>>>>> fires every configured time progression of WM  and a Window implementation
>>>>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might
>>>>>>>>>>>>>>>> have more data but that data has event time grater than the WM ). I am not
>>>>>>>>>>>>>>>> sure we have that built in option and thus was asking for an access the
>>>>>>>>>>>>>>>> current WM for the window operator to allow  us handle the "*only
>>>>>>>>>>>>>>>> data up till that WM" *range retrieval using some  custom
>>>>>>>>>>>>>>>> data structure.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich
>>>>>>>>>>>>>>>>> records with the current watermark before passing them into the window
>>>>>>>>>>>>>>>>> operator.
>>>>>>>>>>>>>>>>> The context object of the processElement() method gives
>>>>>>>>>>>>>>>>> access to the current watermark and timestamp of a record.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. )
>>>>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>>>>>>  value) in Accumulator<IN,..>. It seems that any
>>>>>>>>>>>>>>>>>> mutations to IN in the onElement() is not visible to the Accumulator that
>>>>>>>>>>>>>>>>>> is carrying it as a previous element  reference albeit in the next
>>>>>>>>>>>>>>>>>> invocation of add(). This seems to be only in distributed mode, which makes
>>>>>>>>>>>>>>>>>> sense only if theses reference point to different objects.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and
>>>>>>>>>>>>>>>>>>> save the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The next add() method should have the last reference and
>>>>>>>>>>>>>>>>>>> any mutation done in step 3.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by the
>>>>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not on a distributed
>>>>>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a supplied
>>>>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger on that
>>>>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there any
>>>>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these divergence ( local
>>>>>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
Yep, this though is suboptimal as you imagined.   Two things

* <IN> has a internally has a <INLite> that is a ultra lite version of IN,
only required for the path analysis.
* Sessionization being expensive, we piggy back multiple other aggregations
that do not depend on the path or order ( count etc ) . Essentially Session
is (order path + accumulated stats).

The code seems pretty all right and please tell me if you need a see it.
All generics so no secrets here.








On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> you would not need the ListStateDescriptor. A WindowProcessFunction stores
> all events that are assigned to a window (IN objects in your case) in an
> internal ListState.
> The Iterable<IN> parameter of the process() method iterates over the
> internal list state.
>
> So you would have a Trigger that fires when a new watermark is received
> (or in regular intervals like every minute) and at the end of the window.
> The process() method looks up the current watermark in the Context object,
> traverses the Iterable<IN> filtering out all events with timestamp >
> watermark (you would need to enrich the events with the timestamp which can
> be done in a ProcessFunction), inserting the remaining ones into a sorted
> data structure (possibly leveraging the almost sorted nature of the events)
> and create a Session from it.
>
> This is probably less efficient than your ProcessFunction because
> process() would go over the complete list over and over again and not be
> able to persist the result of previous invocations.
> However, the code should be easier to maintain.
>
> Does that make sense?
>
> Best, Fabian
>
> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> Hello Fabian, Thank you for your response.
>>
>>                      I thought about it and may be am missing something
>> obvious here. The code below is what I think you suggest. The issue is that
>> the window now is a list of Session's ( or shall subsets of the Session).
>>
>> What is required is that on a new watermark
>>
>> * We sort these Session objects
>> * Get the subset that are before the new Watermark and an emit without
>> purge.
>>
>> I do not see how the Trigger approach helps us. It does tell us that the
>> watermark has progressed but to get a subset of the ListState that falls
>> before the watermark, we would need access to *the new value  of the
>> watermark*. That was what my initial query was.
>>
>>
>>
>> public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {
>>
>>
>>     OUT toCreateNew;
>>     Long gap;
>>     private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>>
>>     public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
>>                                 OUT toCreateNew) {
>>         this.toCreateNew = toCreateNew;
>>         mergingSetsStateDescriptor =
>>                 new ListStateDescriptor<>("sessions", aggregationResultType);
>>     }
>>     @Override
>>     public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception {
>>         OUT session = toCreateNew.createNew();
>>         elements.forEach(f -> session.add(f));
>>         context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>>     }
>> }
>>
>>
>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> thanks for sharing your solution!
>>>
>>> Looking at this issue again and your mail in which you shared your
>>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
>>> ValueState that prevents the ProcessWindowFunction to be used in a
>>> mergeable window.
>>> You could have created a new Session object in each invocation of the
>>> ProcessWindowFucntion and simply keep the elements in the (mergable) list
>>> state of the window.
>>> In that case you would simply need a custom trigger that calls the
>>> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
>>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
>>> elements from the window's state.
>>> Did you try that?
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>>> Dear Fabian,
>>>>
>>>>            I was able to create a pretty functional ProcessFunction and
>>>> here is the synopsis and please see if it makes sense.
>>>>
>>>> Sessionization is unique as in it entails windows of dynamic length.
>>>> The way flink approaches is pretty simple. It will create a TimeWindow of
>>>> size "gap" relative to the event time, find an overlapping window (
>>>> intersection ) and create a covering window. Each such window has a "state"
>>>> associated with it, which too has to be merged when a cover window is
>>>> created on intersection of 2 or more incident windows.To be more
>>>> precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3,
>>>> t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the
>>>> associated states are merged.
>>>>
>>>>
>>>> In the current Window API the states are external and are
>>>> Accumulator based. This approach pretty much works for all cases where
>>>> the aggregation is accumulative/reduced  and does not depend on order,
>>>> as in no order list of incoming records needs to be kept and reduction is
>>>> to a single aggregated element ( think counts, min max etc). In path
>>>> analysis ( and other use cases ) however this approach has drawbacks. Even
>>>> though in our accumulator we could keep an ordered list of events it
>>>> becomes unreasonable if not within bounds. An approach that does
>>>> *attempt* to bind state, is to preemptively analyze paths using the WM
>>>> as the marker that defines the *subset* of the state that is safe to
>>>> analyze. So if we have n events in the window state and m fall before WM,
>>>> we can safely analyze the m subset, emitting paths seen and reducing the
>>>> cumulative state size. There are caveats though that I will go into later.
>>>>
>>>>
>>>>
>>>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>>>> have access to the WM.
>>>>
>>>>
>>>> This lead to this generic approach  ( implemented and tested )
>>>>
>>>>
>>>> * Use a low level ProcessFunction that allows access to WM and
>>>> definitely nearer to the guts of Flink.
>>>>
>>>>
>>>> * Still use the merge Windows on intersection approach but use WM to
>>>> trigger ( through Timers)  reductions in state. This is not very
>>>> dissimilar to what Flink does but we have more control over what to do and
>>>> when to do it. Essentially have exposed a lifecycle method that reacts
>>>> to WM progression.
>>>>
>>>>
>>>> * There are essentially 2 Timers. The first timer is the maxTimeStamp()
>>>> of a Window, which if there is no further mutation b'coz of merge etc will
>>>> fire to reflect a Session End. The second one is  on currentWaterMark+1
>>>> that essentially calls a "reduceToWM" on each keyed Window and thus State.
>>>>
>>>>
>>>> * There are 2 ways to short circuit a Session 1. On Session time span
>>>> 2. On Session size.
>>>>
>>>>
>>>> * There is a safety valve to blacklist keys when it is obvious that it
>>>> is a bot ( again
>>>>
>>>>
>>>> The solution will thus preemptively push out Patterns ( and correct
>>>> patterns ) while keeping the ordered state within reasonable bounds. The
>>>> incident data of course has to be analyzed . Are the paths to large etc.
>>>> But one has full control over how to fashion the solution.
>>>>
>>>>
>>>>
>>>>
>>>> Regards and Thanks,
>>>>
>>>>
>>>> Vishal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> This makes sense.  Thanks.
>>>>>
>>>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>>>>> Think of a single thread calling these methods.
>>>>>> Event-time timers are called when a watermark passes the timer.
>>>>>> Watermarks are received as special records, so the methods are called in
>>>>>> the same order as records (actual records or watermarks) arrive at the
>>>>>> function. Only for processing-time timers, actual synchronization is
>>>>>> required.
>>>>>>
>>>>>> The NPE might be thrown because of two timers that fire one after the
>>>>>> other without a new record being processed in between the onTimer() calls.
>>>>>> In that case the state is cleared in the first call and null in the second.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>>>>> think that the core should take care of any synchronization issues between
>>>>>>> calls to onElement and onTimer in case of a keyed stream but tests do not
>>>>>>> seem to suggest that.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have  specifically 2 questions.
>>>>>>>
>>>>>>>
>>>>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key
>>>>>>> ? As in on a keyed stream, is there a  way that 2 or more threads
>>>>>>> can execute on the more than one element of a single key at one time ?
>>>>>>> Would I have to synchronize this construction
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for
>>>>>>> the same key ? I intend to clean up state but I see  NullPointers
>>>>>>> in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer
>>>>>>> are executed on 2  separate threads, with on Timer removing the
>>>>>>> state ( clear() ) but after another thread has registered a Timer ( in
>>>>>>> onElement ).
>>>>>>>
>>>>>>>
>>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>>>         accumulatorState.clear();
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> PS. This is the full code.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> @Override
>>>>>>> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>>>>>>>     TimerService timerService = context.timerService();
>>>>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>>>>         OUT accumulator = accumulatorState.value();
>>>>>>>         if (accumulator == null) {
>>>>>>>             accumulator = acc.createNew();
>>>>>>>         }
>>>>>>>         accumulator.setLastModified(context.timestamp());
>>>>>>>         accumulatorState.update(accumulator);
>>>>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>>>>>>>     OUT accumulator = accumulatorState.value();
>>>>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>>>         accumulatorState.clear();
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That's correct. Removal of timers is not supported in
>>>>>>>> ProcessFunction. Not sure why this is supported for Triggers.
>>>>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>>>>> timers and have a ValueState that stores the valid timestamp on which the
>>>>>>>> onTimer method should be executed.
>>>>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>>>>> whether the timestamp is the correct one and leaves the method if that is
>>>>>>>> not the case.
>>>>>>>> If you want to fire on the next watermark, another trick is to
>>>>>>>> register multiple timers on (currentWatermark + 1). Since there is only one
>>>>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>>>>
>>>>>>>> On the performance of the timer service. AFAIK, all methods that
>>>>>>>> work with some kind of timer use this service. So there is not much choice.
>>>>>>>>
>>>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>
>>>>>>>>> And that further begs the question.. how performant is Timer
>>>>>>>>> Service. I tried to peruse through the architecture behind it but cold not
>>>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many threads
>>>>>>>>> etc...
>>>>>>>>>
>>>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>>>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>>>>>>>> object in onElement() has an effect. The object might have been serialized
>>>>>>>>>>> and stored in RocksDB.
>>>>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>>>>
>>>>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>>>>> because there is no interaction of window assigner, trigger, and window
>>>>>>>>>>> function.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>>>>> Operator.java#L362
>>>>>>>>>>>>
>>>>>>>>>>>> is where We could fashion as to what is emitted. Again for us
>>>>>>>>>>>> it seems natural to use WM to materialize a micro batches with
>>>>>>>>>>>> "approximate" order ( and no I am not a fan of spark micro batches :)). Any
>>>>>>>>>>>> pointers as to how we could write an implementation that allows for "up
>>>>>>>>>>>> till WM emission" through a trigger on a Session Window would be very
>>>>>>>>>>>> helpful. In essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>>>>>>
>>>>>>>>>>>> Something like https://github.com/apache
>>>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>>>>
>>>>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>>>>> Again the motive is the keep the state lean as we desire to search for
>>>>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and thus of
>>>>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in this
>>>>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call after telling
>>>>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>>>         session.clear();
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>>>>>>>> windows.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that
>>>>>>>>>>>>>>> fires every configured time progression of WM  and a Window implementation
>>>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might
>>>>>>>>>>>>>>> have more data but that data has event time grater than the WM ). I am not
>>>>>>>>>>>>>>> sure we have that built in option and thus was asking for an access the
>>>>>>>>>>>>>>> current WM for the window operator to allow  us handle the "*only
>>>>>>>>>>>>>>> data up till that WM" *range retrieval using some  custom
>>>>>>>>>>>>>>> data structure.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich
>>>>>>>>>>>>>>>> records with the current watermark before passing them into the window
>>>>>>>>>>>>>>>> operator.
>>>>>>>>>>>>>>>> The context object of the processElement() method gives
>>>>>>>>>>>>>>>> access to the current watermark and timestamp of a record.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. )
>>>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>>>>>  value) in Accumulator<IN,..>. It seems that any
>>>>>>>>>>>>>>>>> mutations to IN in the onElement() is not visible to the Accumulator that
>>>>>>>>>>>>>>>>> is carrying it as a previous element  reference albeit in the next
>>>>>>>>>>>>>>>>> invocation of add(). This seems to be only in distributed mode, which makes
>>>>>>>>>>>>>>>>> sense only if theses reference point to different objects.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and
>>>>>>>>>>>>>>>>>> save the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The next add() method should have the last reference and
>>>>>>>>>>>>>>>>>> any mutation done in step 3.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by the
>>>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not on a distributed
>>>>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a supplied
>>>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger on that
>>>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there any
>>>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these divergence ( local
>>>>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

you would not need the ListStateDescriptor. A WindowProcessFunction stores
all events that are assigned to a window (IN objects in your case) in an
internal ListState.
The Iterable<IN> parameter of the process() method iterates over the
internal list state.

So you would have a Trigger that fires when a new watermark is received (or
in regular intervals like every minute) and at the end of the window.
The process() method looks up the current watermark in the Context object,
traverses the Iterable<IN> filtering out all events with timestamp >
watermark (you would need to enrich the events with the timestamp which can
be done in a ProcessFunction), inserting the remaining ones into a sorted
data structure (possibly leveraging the almost sorted nature of the events)
and create a Session from it.

This is probably less efficient than your ProcessFunction because process()
would go over the complete list over and over again and not be able to
persist the result of previous invocations.
However, the code should be easier to maintain.

Does that make sense?

Best, Fabian

2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> Hello Fabian, Thank you for your response.
>
>                      I thought about it and may be am missing something
> obvious here. The code below is what I think you suggest. The issue is that
> the window now is a list of Session's ( or shall subsets of the Session).
>
> What is required is that on a new watermark
>
> * We sort these Session objects
> * Get the subset that are before the new Watermark and an emit without
> purge.
>
> I do not see how the Trigger approach helps us. It does tell us that the
> watermark has progressed but to get a subset of the ListState that falls
> before the watermark, we would need access to *the new value  of the
> watermark*. That was what my initial query was.
>
>
>
> public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {
>
>
>     OUT toCreateNew;
>     Long gap;
>     private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>
>     public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
>                                 OUT toCreateNew) {
>         this.toCreateNew = toCreateNew;
>         mergingSetsStateDescriptor =
>                 new ListStateDescriptor<>("sessions", aggregationResultType);
>     }
>     @Override
>     public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception {
>         OUT session = toCreateNew.createNew();
>         elements.forEach(f -> session.add(f));
>         context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>     }
> }
>
>
> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> thanks for sharing your solution!
>>
>> Looking at this issue again and your mail in which you shared your
>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
>> ValueState that prevents the ProcessWindowFunction to be used in a
>> mergeable window.
>> You could have created a new Session object in each invocation of the
>> ProcessWindowFucntion and simply keep the elements in the (mergable) list
>> state of the window.
>> In that case you would simply need a custom trigger that calls the
>> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
>> elements from the window's state.
>> Did you try that?
>>
>> Best, Fabian
>>
>>
>>
>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> Dear Fabian,
>>>
>>>            I was able to create a pretty functional ProcessFunction and
>>> here is the synopsis and please see if it makes sense.
>>>
>>> Sessionization is unique as in it entails windows of dynamic length. The
>>> way flink approaches is pretty simple. It will create a TimeWindow of size
>>> "gap" relative to the event time, find an overlapping window ( intersection
>>> ) and create a covering window. Each such window has a "state" associated
>>> with it, which too has to be merged when a cover window is created on
>>> intersection of 2 or more incident windows.To be more precise if
>>> Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and
>>>   t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated
>>> states are merged.
>>>
>>>
>>> In the current Window API the states are external and are
>>> Accumulator based. This approach pretty much works for all cases where
>>> the aggregation is accumulative/reduced  and does not depend on order,
>>> as in no order list of incoming records needs to be kept and reduction is
>>> to a single aggregated element ( think counts, min max etc). In path
>>> analysis ( and other use cases ) however this approach has drawbacks. Even
>>> though in our accumulator we could keep an ordered list of events it
>>> becomes unreasonable if not within bounds. An approach that does
>>> *attempt* to bind state, is to preemptively analyze paths using the WM
>>> as the marker that defines the *subset* of the state that is safe to
>>> analyze. So if we have n events in the window state and m fall before WM,
>>> we can safely analyze the m subset, emitting paths seen and reducing the
>>> cumulative state size. There are caveats though that I will go into later.
>>>
>>>
>>>
>>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>>> have access to the WM.
>>>
>>>
>>> This lead to this generic approach  ( implemented and tested )
>>>
>>>
>>> * Use a low level ProcessFunction that allows access to WM and
>>> definitely nearer to the guts of Flink.
>>>
>>>
>>> * Still use the merge Windows on intersection approach but use WM to
>>> trigger ( through Timers)  reductions in state. This is not very
>>> dissimilar to what Flink does but we have more control over what to do and
>>> when to do it. Essentially have exposed a lifecycle method that reacts
>>> to WM progression.
>>>
>>>
>>> * There are essentially 2 Timers. The first timer is the maxTimeStamp()
>>> of a Window, which if there is no further mutation b'coz of merge etc will
>>> fire to reflect a Session End. The second one is  on currentWaterMark+1
>>> that essentially calls a "reduceToWM" on each keyed Window and thus State.
>>>
>>>
>>> * There are 2 ways to short circuit a Session 1. On Session time span 2.
>>> On Session size.
>>>
>>>
>>> * There is a safety valve to blacklist keys when it is obvious that it
>>> is a bot ( again
>>>
>>>
>>> The solution will thus preemptively push out Patterns ( and correct
>>> patterns ) while keeping the ordered state within reasonable bounds. The
>>> incident data of course has to be analyzed . Are the paths to large etc.
>>> But one has full control over how to fashion the solution.
>>>
>>>
>>>
>>>
>>> Regards and Thanks,
>>>
>>>
>>> Vishal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> This makes sense.  Thanks.
>>>>
>>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>>>> Think of a single thread calling these methods.
>>>>> Event-time timers are called when a watermark passes the timer.
>>>>> Watermarks are received as special records, so the methods are called in
>>>>> the same order as records (actual records or watermarks) arrive at the
>>>>> function. Only for processing-time timers, actual synchronization is
>>>>> required.
>>>>>
>>>>> The NPE might be thrown because of two timers that fire one after the
>>>>> other without a new record being processed in between the onTimer() calls.
>>>>> In that case the state is cleared in the first call and null in the second.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>
>>>>> :
>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>>>> think that the core should take care of any synchronization issues between
>>>>>> calls to onElement and onTimer in case of a keyed stream but tests do not
>>>>>> seem to suggest that.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have  specifically 2 questions.
>>>>>>
>>>>>>
>>>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ?
>>>>>> As in on a keyed stream, is there a  way that 2 or more threads can
>>>>>> execute on the more than one element of a single key at one time ? Would I
>>>>>> have to synchronize this construction
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for
>>>>>> the same key ? I intend to clean up state but I see  NullPointers in
>>>>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>>>>>> executed on 2  separate threads, with on Timer removing the state (
>>>>>> clear() ) but after another thread has registered a Timer ( in onElement ).
>>>>>>
>>>>>>
>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>>         accumulatorState.clear();
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> PS. This is the full code.
>>>>>>
>>>>>>
>>>>>>
>>>>>> @Override
>>>>>> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>>>>>>     TimerService timerService = context.timerService();
>>>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>>>         OUT accumulator = accumulatorState.value();
>>>>>>         if (accumulator == null) {
>>>>>>             accumulator = acc.createNew();
>>>>>>         }
>>>>>>         accumulator.setLastModified(context.timestamp());
>>>>>>         accumulatorState.update(accumulator);
>>>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>>>>>>     OUT accumulator = accumulatorState.value();
>>>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>>         accumulatorState.clear();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> That's correct. Removal of timers is not supported in
>>>>>>> ProcessFunction. Not sure why this is supported for Triggers.
>>>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>>>> timers and have a ValueState that stores the valid timestamp on which the
>>>>>>> onTimer method should be executed.
>>>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>>>> whether the timestamp is the correct one and leaves the method if that is
>>>>>>> not the case.
>>>>>>> If you want to fire on the next watermark, another trick is to
>>>>>>> register multiple timers on (currentWatermark + 1). Since there is only one
>>>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>>>
>>>>>>> On the performance of the timer service. AFAIK, all methods that
>>>>>>> work with some kind of timer use this service. So there is not much choice.
>>>>>>>
>>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>
>>>>>>>> And that further begs the question.. how performant is Timer
>>>>>>>> Service. I tried to peruse through the architecture behind it but cold not
>>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many threads
>>>>>>>> etc...
>>>>>>>>
>>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>>>
>>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Vishal,
>>>>>>>>>>
>>>>>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>>>>>>> object in onElement() has an effect. The object might have been serialized
>>>>>>>>>> and stored in RocksDB.
>>>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>>>
>>>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>>>> because there is no interaction of window assigner, trigger, and window
>>>>>>>>>> function.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>>>> Operator.java#L362
>>>>>>>>>>>
>>>>>>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>>>>>>> seems natural to use WM to materialize a micro batches with "approximate"
>>>>>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers as
>>>>>>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>>>>>>> emission" through a trigger on a Session Window would be very helpful. In
>>>>>>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>>>>>
>>>>>>>>>>> Something like https://github.com/apache
>>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>>>
>>>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>>>> Again the motive is the keep the state lean as we desire to search for
>>>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and thus of
>>>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in this
>>>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call after telling
>>>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>>>
>>>>>>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>>>
>>>>>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>>>>>>
>>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>>>         }
>>>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>>         session.clear();
>>>>>>>>>>>>>     }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>>>>>>> windows.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that
>>>>>>>>>>>>>> fires every configured time progression of WM  and a Window implementation
>>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might
>>>>>>>>>>>>>> have more data but that data has event time grater than the WM ). I am not
>>>>>>>>>>>>>> sure we have that built in option and thus was asking for an access the
>>>>>>>>>>>>>> current WM for the window operator to allow  us handle the "*only
>>>>>>>>>>>>>> data up till that WM" *range retrieval using some  custom
>>>>>>>>>>>>>> data structure.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records
>>>>>>>>>>>>>>> with the current watermark before passing them into the window operator.
>>>>>>>>>>>>>>> The context object of the processElement() method gives
>>>>>>>>>>>>>>> access to the current watermark and timestamp of a record.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. )
>>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>>>>  value) in Accumulator<IN,..>. It seems that any mutations
>>>>>>>>>>>>>>>> to IN in the onElement() is not visible to the Accumulator that is carrying
>>>>>>>>>>>>>>>> it as a previous element  reference albeit in the next invocation of add().
>>>>>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense only if theses
>>>>>>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and
>>>>>>>>>>>>>>>>> save the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The next add() method should have the last reference and
>>>>>>>>>>>>>>>>> any mutation done in step 3.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by the
>>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not on a distributed
>>>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a supplied
>>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger on that
>>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there any
>>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these divergence ( local
>>>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
Hello Fabian, Thank you for your response.

                     I thought about it and may be am missing something
obvious here. The code below is what I think you suggest. The issue is that
the window now is a list of Session's ( or shall subsets of the Session).

What is required is that on a new watermark

* We sort these Session objects
* Get the subset that are before the new Watermark and an emit without
purge.

I do not see how the Trigger approach helps us. It does tell us that the
watermark has progressed but to get a subset of the ListState that falls
before the watermark, we would need access to *the new value  of the
watermark*. That was what my initial query was.



public class SessionProcessWindow<IN extends HasTime & HasKey, OUT
extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT,
String, TimeWindow> {


    OUT toCreateNew;
    Long gap;
    private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;

    public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
                                OUT toCreateNew) {
        this.toCreateNew = toCreateNew;
        mergingSetsStateDescriptor =
                new ListStateDescriptor<>("sessions", aggregationResultType);
    }
    @Override
    public void process(String s, Context context, Iterable<IN>
elements, Collector<OUT> out) throws Exception {
        OUT session = toCreateNew.createNew();
        elements.forEach(f -> session.add(f));
        context.windowState().getListState(mergingSetsStateDescriptor).add(session);
    }
}


On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vishal,
>
> thanks for sharing your solution!
>
> Looking at this issue again and your mail in which you shared your
> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
> ValueState that prevents the ProcessWindowFunction to be used in a
> mergeable window.
> You could have created a new Session object in each invocation of the
> ProcessWindowFucntion and simply keep the elements in the (mergable) list
> state of the window.
> In that case you would simply need a custom trigger that calls the
> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
> elements from the window's state.
> Did you try that?
>
> Best, Fabian
>
>
>
> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> Dear Fabian,
>>
>>            I was able to create a pretty functional ProcessFunction and
>> here is the synopsis and please see if it makes sense.
>>
>> Sessionization is unique as in it entails windows of dynamic length. The
>> way flink approaches is pretty simple. It will create a TimeWindow of size
>> "gap" relative to the event time, find an overlapping window ( intersection
>> ) and create a covering window. Each such window has a "state" associated
>> with it, which too has to be merged when a cover window is created on
>> intersection of 2 or more incident windows.To be more precise if Window1
>> spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
>> a new Window is created ( t1, t4 ) and the associated states are merged.
>>
>>
>> In the current Window API the states are external and are
>> Accumulator based. This approach pretty much works for all cases where
>> the aggregation is accumulative/reduced  and does not depend on order,
>> as in no order list of incoming records needs to be kept and reduction is
>> to a single aggregated element ( think counts, min max etc). In path
>> analysis ( and other use cases ) however this approach has drawbacks. Even
>> though in our accumulator we could keep an ordered list of events it
>> becomes unreasonable if not within bounds. An approach that does
>> *attempt* to bind state, is to preemptively analyze paths using the WM
>> as the marker that defines the *subset* of the state that is safe to
>> analyze. So if we have n events in the window state and m fall before WM,
>> we can safely analyze the m subset, emitting paths seen and reducing the
>> cumulative state size. There are caveats though that I will go into later.
>>
>>
>>
>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>> have access to the WM.
>>
>>
>> This lead to this generic approach  ( implemented and tested )
>>
>>
>> * Use a low level ProcessFunction that allows access to WM and definitely
>> nearer to the guts of Flink.
>>
>>
>> * Still use the merge Windows on intersection approach but use WM to
>> trigger ( through Timers)  reductions in state. This is not very
>> dissimilar to what Flink does but we have more control over what to do and
>> when to do it. Essentially have exposed a lifecycle method that reacts
>> to WM progression.
>>
>>
>> * There are essentially 2 Timers. The first timer is the maxTimeStamp()
>> of a Window, which if there is no further mutation b'coz of merge etc will
>> fire to reflect a Session End. The second one is  on currentWaterMark+1
>> that essentially calls a "reduceToWM" on each keyed Window and thus State.
>>
>>
>> * There are 2 ways to short circuit a Session 1. On Session time span 2.
>> On Session size.
>>
>>
>> * There is a safety valve to blacklist keys when it is obvious that it is
>> a bot ( again
>>
>>
>> The solution will thus preemptively push out Patterns ( and correct
>> patterns ) while keeping the ordered state within reasonable bounds. The
>> incident data of course has to be analyzed . Are the paths to large etc.
>> But one has full control over how to fashion the solution.
>>
>>
>>
>>
>> Regards and Thanks,
>>
>>
>> Vishal
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> This makes sense.  Thanks.
>>>
>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>>> Think of a single thread calling these methods.
>>>> Event-time timers are called when a watermark passes the timer.
>>>> Watermarks are received as special records, so the methods are called in
>>>> the same order as records (actual records or watermarks) arrive at the
>>>> function. Only for processing-time timers, actual synchronization is
>>>> required.
>>>>
>>>> The NPE might be thrown because of two timers that fire one after the
>>>> other without a new record being processed in between the onTimer() calls.
>>>> In that case the state is cleared in the first call and null in the second.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>>> think that the core should take care of any synchronization issues between
>>>>> calls to onElement and onTimer in case of a keyed stream but tests do not
>>>>> seem to suggest that.
>>>>>
>>>>>
>>>>>
>>>>> I have  specifically 2 questions.
>>>>>
>>>>>
>>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ?
>>>>> As in on a keyed stream, is there a  way that 2 or more threads can
>>>>> execute on the more than one element of a single key at one time ? Would I
>>>>> have to synchronize this construction
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>>>>>
>>>>>
>>>>>
>>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>>>>> same key ? I intend to clean up state but I see  NullPointers in
>>>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>>>>> executed on 2  separate threads, with on Timer removing the state (
>>>>> clear() ) but after another thread has registered a Timer ( in onElement ).
>>>>>
>>>>>
>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>         accumulatorState.clear();
>>>>>     }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> PS. This is the full code.
>>>>>
>>>>>
>>>>>
>>>>> @Override
>>>>> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>>>>>     TimerService timerService = context.timerService();
>>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>>         OUT accumulator = accumulatorState.value();
>>>>>         if (accumulator == null) {
>>>>>             accumulator = acc.createNew();
>>>>>         }
>>>>>         accumulator.setLastModified(context.timestamp());
>>>>>         accumulatorState.update(accumulator);
>>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>>     }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>>>>>     OUT accumulator = accumulatorState.value();
>>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>>         accumulatorState.clear();
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That's correct. Removal of timers is not supported in
>>>>>> ProcessFunction. Not sure why this is supported for Triggers.
>>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>>> timers and have a ValueState that stores the valid timestamp on which the
>>>>>> onTimer method should be executed.
>>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>>> whether the timestamp is the correct one and leaves the method if that is
>>>>>> not the case.
>>>>>> If you want to fire on the next watermark, another trick is to
>>>>>> register multiple timers on (currentWatermark + 1). Since there is only one
>>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>>
>>>>>> On the performance of the timer service. AFAIK, all methods that work
>>>>>> with some kind of timer use this service. So there is not much choice.
>>>>>>
>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> And that further begs the question.. how performant is Timer
>>>>>>> Service. I tried to peruse through the architecture behind it but cold not
>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many threads
>>>>>>> etc...
>>>>>>>
>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>>
>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>>>>>> object in onElement() has an effect. The object might have been serialized
>>>>>>>>> and stored in RocksDB.
>>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>>
>>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>>> because there is no interaction of window assigner, trigger, and window
>>>>>>>>> function.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>
>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>>> Operator.java#L362
>>>>>>>>>>
>>>>>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>>>>>> seems natural to use WM to materialize a micro batches with "approximate"
>>>>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers as
>>>>>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>>>>>> emission" through a trigger on a Session Window would be very helpful. In
>>>>>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>>>>
>>>>>>>>>> Something like https://github.com/apache
>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>>
>>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>>> Again the motive is the keep the state lean as we desire to search for
>>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and thus of
>>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in this
>>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call after telling
>>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>>
>>>>>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>>
>>>>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>>>>>
>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>>         }
>>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>>         session.clear();
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>>>>>> windows.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that
>>>>>>>>>>>>> fires every configured time progression of WM  and a Window implementation
>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might have
>>>>>>>>>>>>> more data but that data has event time grater than the WM ). I am not sure
>>>>>>>>>>>>> we have that built in option and thus was asking for an access the current
>>>>>>>>>>>>> WM for the window operator to allow  us handle the "*only
>>>>>>>>>>>>> data up till that WM" *range retrieval using some  custom
>>>>>>>>>>>>> data structure.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records
>>>>>>>>>>>>>> with the current watermark before passing them into the window operator.
>>>>>>>>>>>>>> The context object of the processElement() method gives
>>>>>>>>>>>>>> access to the current watermark and timestamp of a record.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations
>>>>>>>>>>>>>>> to IN in the onElement() is not visible to the Accumulator that is carrying
>>>>>>>>>>>>>>> it as a previous element  reference albeit in the next invocation of add().
>>>>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense only if theses
>>>>>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and
>>>>>>>>>>>>>>>> save the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The next add() method should have the last reference and
>>>>>>>>>>>>>>>> any mutation done in step 3.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by the
>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not on a distributed
>>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a supplied
>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger on that
>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there any
>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these divergence ( local
>>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,

thanks for sharing your solution!

Looking at this issue again and your mail in which you shared your
SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
ValueState that prevents the ProcessWindowFunction to be used in a
mergeable window.
You could have created a new Session object in each invocation of the
ProcessWindowFucntion and simply keep the elements in the (mergable) list
state of the window.
In that case you would simply need a custom trigger that calls the
ProcessWindowFunction when a new watermark arrives. For intermediate calls,
you just FIRE and for the final call you FIRE_AND_PURGE to remove the
elements from the window's state.
Did you try that?

Best, Fabian



2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> Dear Fabian,
>
>            I was able to create a pretty functional ProcessFunction and
> here is the synopsis and please see if it makes sense.
>
> Sessionization is unique as in it entails windows of dynamic length. The
> way flink approaches is pretty simple. It will create a TimeWindow of size
> "gap" relative to the event time, find an overlapping window ( intersection
> ) and create a covering window. Each such window has a "state" associated
> with it, which too has to be merged when a cover window is created on
> intersection of 2 or more incident windows.To be more precise if Window1
> spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
> a new Window is created ( t1, t4 ) and the associated states are merged.
>
>
> In the current Window API the states are external and are
> Accumulator based. This approach pretty much works for all cases where
> the aggregation is accumulative/reduced  and does not depend on order, as
> in no order list of incoming records needs to be kept and reduction is to a
> single aggregated element ( think counts, min max etc). In path analysis (
> and other use cases ) however this approach has drawbacks. Even though in
> our accumulator we could keep an ordered list of events it becomes
> unreasonable if not within bounds. An approach that does *attempt* to
> bind state, is to preemptively analyze paths using the WM as the marker
> that defines the *subset* of the state that is safe to analyze. So if we
> have n events in the window state and m fall before WM, we can safely
> analyze the m subset, emitting paths seen and reducing the cumulative state
> size. There are caveats though that I will go into later.
>
>
> Unfortunately the Accumulators in Flink Window runtime defaults do not
> have access to the WM.
>
>
> This lead to this generic approach  ( implemented and tested )
>
>
> * Use a low level ProcessFunction that allows access to WM and definitely
> nearer to the guts of Flink.
>
>
> * Still use the merge Windows on intersection approach but use WM to
> trigger ( through Timers)  reductions in state. This is not very
> dissimilar to what Flink does but we have more control over what to do and
> when to do it. Essentially have exposed a lifecycle method that reacts to
> WM progression.
>
>
> * There are essentially 2 Timers. The first timer is the maxTimeStamp() of
> a Window, which if there is no further mutation b'coz of merge etc will
> fire to reflect a Session End. The second one is  on currentWaterMark+1
> that essentially calls a "reduceToWM" on each keyed Window and thus State.
>
>
> * There are 2 ways to short circuit a Session 1. On Session time span 2.
> On Session size.
>
>
> * There is a safety valve to blacklist keys when it is obvious that it is
> a bot ( again
>
>
> The solution will thus preemptively push out Patterns ( and correct
> patterns ) while keeping the ordered state within reasonable bounds. The
> incident data of course has to be analyzed . Are the paths to large etc.
> But one has full control over how to fashion the solution.
>
>
>
>
> Regards and Thanks,
>
>
> Vishal
>
>
>
>
>
>
>
>
>
> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> This makes sense.  Thanks.
>>
>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>> Think of a single thread calling these methods.
>>> Event-time timers are called when a watermark passes the timer.
>>> Watermarks are received as special records, so the methods are called in
>>> the same order as records (actual records or watermarks) arrive at the
>>> function. Only for processing-time timers, actual synchronization is
>>> required.
>>>
>>> The NPE might be thrown because of two timers that fire one after the
>>> other without a new record being processed in between the onTimer() calls.
>>> In that case the state is cleared in the first call and null in the second.
>>>
>>> Best, Fabian
>>>
>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>>> Thanks.
>>>>
>>>>
>>>>
>>>>
>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>> think that the core should take care of any synchronization issues between
>>>> calls to onElement and onTimer in case of a keyed stream but tests do not
>>>> seem to suggest that.
>>>>
>>>>
>>>>
>>>> I have  specifically 2 questions.
>>>>
>>>>
>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ?
>>>> As in on a keyed stream, is there a  way that 2 or more threads can
>>>> execute on the more than one element of a single key at one time ? Would I
>>>> have to synchronize this construction
>>>>
>>>>
>>>>
>>>>
>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>>>>
>>>>
>>>>
>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>>>> same key ? I intend to clean up state but I see  NullPointers in
>>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>>>> executed on 2  separate threads, with on Timer removing the state (
>>>> clear() ) but after another thread has registered a Timer ( in onElement ).
>>>>
>>>>
>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>         accumulatorState.clear();
>>>>     }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> PS. This is the full code.
>>>>
>>>>
>>>>
>>>> @Override
>>>> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>>>>     TimerService timerService = context.timerService();
>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>         OUT accumulator = accumulatorState.value();
>>>>         if (accumulator == null) {
>>>>             accumulator = acc.createNew();
>>>>         }
>>>>         accumulator.setLastModified(context.timestamp());
>>>>         accumulatorState.update(accumulator);
>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>     }
>>>> }
>>>>
>>>> @Override
>>>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>>>>     OUT accumulator = accumulatorState.value();
>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>>         accumulatorState.clear();
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> That's correct. Removal of timers is not supported in ProcessFunction.
>>>>> Not sure why this is supported for Triggers.
>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>> timers and have a ValueState that stores the valid timestamp on which the
>>>>> onTimer method should be executed.
>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>> whether the timestamp is the correct one and leaves the method if that is
>>>>> not the case.
>>>>> If you want to fire on the next watermark, another trick is to
>>>>> register multiple timers on (currentWatermark + 1). Since there is only one
>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>
>>>>> On the performance of the timer service. AFAIK, all methods that work
>>>>> with some kind of timer use this service. So there is not much choice.
>>>>>
>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>
>>>>> :
>>>>>
>>>>>> And that further begs the question.. how performant is Timer Service.
>>>>>> I tried to peruse through the architecture behind it but cold not find a
>>>>>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>>>>>
>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>
>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>>
>>>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>>>>> object in onElement() has an effect. The object might have been serialized
>>>>>>>> and stored in RocksDB.
>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>
>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>> because there is no interaction of window assigner, trigger, and window
>>>>>>>> function.
>>>>>>>>
>>>>>>>>
>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>
>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>> Operator.java#L362
>>>>>>>>>
>>>>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>>>>> seems natural to use WM to materialize a micro batches with "approximate"
>>>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers as
>>>>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>>>>> emission" through a trigger on a Session Window would be very helpful. In
>>>>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>>>
>>>>>>>>> Something like https://github.com/apache
>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>
>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>> Again the motive is the keep the state lean as we desire to search for
>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and thus of
>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>> complain about MergeableWindow and state. The Session class in this
>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call after telling
>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>
>>>>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>
>>>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>>>>
>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>         }
>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>>         session.clear();
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>
>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>>>>> windows.
>>>>>>>>>>>>
>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>>>>>> every configured time progression of WM  and a Window implementation that
>>>>>>>>>>>> materializes *only data up till that WM* ( it might have more
>>>>>>>>>>>> data but that data has event time grater than the WM ). I am not sure we
>>>>>>>>>>>> have that built in option and thus was asking for an access the current WM
>>>>>>>>>>>> for the window operator to allow  us handle the "*only data up
>>>>>>>>>>>> till that WM" *range retrieval using some  custom data
>>>>>>>>>>>> structure.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>
>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records
>>>>>>>>>>>>> with the current watermark before passing them into the window operator.
>>>>>>>>>>>>> The context object of the processElement() method gives access
>>>>>>>>>>>>> to the current watermark and timestamp of a record.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations to
>>>>>>>>>>>>>> IN in the onElement() is not visible to the Accumulator that is carrying it
>>>>>>>>>>>>>> as a previous element  reference albeit in the next invocation of add().
>>>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense only if theses
>>>>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save
>>>>>>>>>>>>>>> the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by the
>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not on a distributed
>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a supplied
>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger on that
>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there any
>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these divergence ( local
>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
Dear Fabian,

           I was able to create a pretty functional ProcessFunction and
here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The
way flink approaches is pretty simple. It will create a TimeWindow of size
"gap" relative to the event time, find an overlapping window ( intersection
) and create a covering window. Each such window has a "state" associated
with it, which too has to be merged when a cover window is created on
intersection of 2 or more incident windows.To be more precise if Window1
spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
a new Window is created ( t1, t4 ) and the associated states are merged.


In the current Window API the states are external and are
Accumulator based. This approach pretty much works for all cases where the
aggregation is accumulative/reduced  and does not depend on order, as in no
order list of incoming records needs to be kept and reduction is to a
single aggregated element ( think counts, min max etc). In path analysis (
and other use cases ) however this approach has drawbacks. Even though in
our accumulator we could keep an ordered list of events it becomes
unreasonable if not within bounds. An approach that does *attempt* to bind
state, is to preemptively analyze paths using the WM as the marker that
defines the *subset* of the state that is safe to analyze. So if we have n
events in the window state and m fall before WM, we can safely analyze the
m subset, emitting paths seen and reducing the cumulative state size. There
are caveats though that I will go into later.


Unfortunately the Accumulators in Flink Window runtime defaults do not have
access to the WM.


This lead to this generic approach  ( implemented and tested )


* Use a low level ProcessFunction that allows access to WM and definitely
nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to
trigger ( through Timers)  reductions in state. This is not very dissimilar
to what Flink does but we have more control over what to do and when to do
it. Essentially have exposed a lifecycle method that reacts to WM
progression.


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of
a Window, which if there is no further mutation b'coz of merge etc will
fire to reflect a Session End. The second one is  on currentWaterMark+1
that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On
Session size.


* There is a safety valve to blacklist keys when it is obvious that it is a
bot ( again


The solution will thus preemptively push out Patterns ( and correct
patterns ) while keeping the ordered state within reasonable bounds. The
incident data of course has to be analyzed . Are the paths to large etc.
But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:

> This makes sense.  Thanks.
>
> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> all calls to onElement() or onTimer() are syncronized for any keys. Think
>> of a single thread calling these methods.
>> Event-time timers are called when a watermark passes the timer.
>> Watermarks are received as special records, so the methods are called in
>> the same order as records (actual records or watermarks) arrive at the
>> function. Only for processing-time timers, actual synchronization is
>> required.
>>
>> The NPE might be thrown because of two timers that fire one after the
>> other without a new record being processed in between the onTimer() calls.
>> In that case the state is cleared in the first call and null in the second.
>>
>> Best, Fabian
>>
>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>     I have a few follow up questions regarding ProcessFunction. I think
>>> that the core should take care of any synchronization issues between calls
>>> to onElement and onTimer in case of a keyed stream but tests do not seem to
>>> suggest that.
>>>
>>>
>>>
>>> I have  specifically 2 questions.
>>>
>>>
>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
>>> in on a keyed stream, is there a  way that 2 or more threads can
>>> execute on the more than one element of a single key at one time ? Would I
>>> have to synchronize this construction
>>>
>>>
>>>
>>>
>>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>>>
>>>
>>>
>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>>> same key ? I intend to clean up state but I see  NullPointers in
>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>>> executed on 2  separate threads, with on Timer removing the state (
>>> clear() ) but after another thread has registered a Timer ( in onElement ).
>>>
>>>
>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>         accumulatorState.clear();
>>>     }
>>>
>>>
>>>
>>>
>>>
>>>
>>> PS. This is the full code.
>>>
>>>
>>>
>>> @Override
>>> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>>>     TimerService timerService = context.timerService();
>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>         OUT accumulator = accumulatorState.value();
>>>         if (accumulator == null) {
>>>             accumulator = acc.createNew();
>>>         }
>>>         accumulator.setLastModified(context.timestamp());
>>>         accumulatorState.update(accumulator);
>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>     }
>>> }
>>>
>>> @Override
>>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>>>     OUT accumulator = accumulatorState.value();
>>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>>         accumulatorState.clear();
>>>     }
>>> }
>>>
>>>
>>>
>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> That's correct. Removal of timers is not supported in ProcessFunction.
>>>> Not sure why this is supported for Triggers.
>>>> The common workaround for ProcessFunctions is to register multiple
>>>> timers and have a ValueState that stores the valid timestamp on which the
>>>> onTimer method should be executed.
>>>> When a timer fires and calls onTimer(), the method first checks whether
>>>> the timestamp is the correct one and leaves the method if that is not the
>>>> case.
>>>> If you want to fire on the next watermark, another trick is to register
>>>> multiple timers on (currentWatermark + 1). Since there is only one timer
>>>> per timestamp, there is only one timer which gets continuously overwritten.
>>>> The timer is called when the watermark is advanced.
>>>>
>>>> On the performance of the timer service. AFAIK, all methods that work
>>>> with some kind of timer use this service. So there is not much choice.
>>>>
>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>
>>>>> And that further begs the question.. how performant is Timer Service.
>>>>> I tried to peruse through the architecture behind it but cold not find a
>>>>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>>>>
>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>
>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>>>> object in onElement() has an effect. The object might have been serialized
>>>>>>> and stored in RocksDB.
>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>
>>>>>>> Have you considered to implement the operation completely in a
>>>>>>> ProcessFunction instead of a session window?
>>>>>>> This might be more code but easier to design and reason about
>>>>>>> because there is no interaction of window assigner, trigger, and window
>>>>>>> function.
>>>>>>>
>>>>>>>
>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>
>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>> Operator.java#L362
>>>>>>>>
>>>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>>>> seems natural to use WM to materialize a micro batches with "approximate"
>>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers as
>>>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>>>> emission" through a trigger on a Session Window would be very helpful. In
>>>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>>
>>>>>>>> Something like https://github.com/apache
>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>
>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>> Again the motive is the keep the state lean as we desire to search for
>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and thus of
>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>
>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>> complain about MergeableWindow and state. The Session class in this
>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call after telling
>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>
>>>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>
>>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>>>
>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>             s.add(e);
>>>>>>>>>>         }
>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>>>         s.reduce();
>>>>>>>>>>         out.collect(s);
>>>>>>>>>>         session.update(s);
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>>         session.clear();
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>
>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>>>> windows.
>>>>>>>>>>>
>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>>>>> every configured time progression of WM  and a Window implementation that
>>>>>>>>>>> materializes *only data up till that WM* ( it might have more
>>>>>>>>>>> data but that data has event time grater than the WM ). I am not sure we
>>>>>>>>>>> have that built in option and thus was asking for an access the current WM
>>>>>>>>>>> for the window operator to allow  us handle the "*only data up
>>>>>>>>>>> till that WM" *range retrieval using some  custom data
>>>>>>>>>>> structure.
>>>>>>>>>>>
>>>>>>>>>>> Regards.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>
>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records
>>>>>>>>>>>> with the current watermark before passing them into the window operator.
>>>>>>>>>>>> The context object of the processElement() method gives access
>>>>>>>>>>>> to the current watermark and timestamp of a record.
>>>>>>>>>>>>
>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>>
>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>
>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations to
>>>>>>>>>>>>> IN in the onElement() is not visible to the Accumulator that is carrying it
>>>>>>>>>>>>> as a previous element  reference albeit in the next invocation of add().
>>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense only if theses
>>>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>
>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>
>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>
>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>
>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>
>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>
>>>>>>>>>>>>>         .
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save
>>>>>>>>>>>>>> the POJO  reference in the Accumulator.
>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster,
>>>>>>>>>>>>>> as in I have access to the mutation by the onElement() in the POJO in the
>>>>>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> all calls to onElement() or onTimer() are syncronized for any keys. Think
> of a single thread calling these methods.
> Event-time timers are called when a watermark passes the timer. Watermarks
> are received as special records, so the methods are called in the same
> order as records (actual records or watermarks) arrive at the function.
> Only for processing-time timers, actual synchronization is required.
>
> The NPE might be thrown because of two timers that fire one after the
> other without a new record being processed in between the onTimer() calls.
> In that case the state is cleared in the first call and null in the second.
>
> Best, Fabian
>
> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> Thanks.
>>
>>
>>
>>
>>     I have a few follow up questions regarding ProcessFunction. I think
>> that the core should take care of any synchronization issues between calls
>> to onElement and onTimer in case of a keyed stream but tests do not seem to
>> suggest that.
>>
>>
>>
>> I have  specifically 2 questions.
>>
>>
>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
>> in on a keyed stream, is there a  way that 2 or more threads can execute
>> on the more than one element of a single key at one time ? Would I have to
>> synchronize this construction
>>
>>
>>
>>
>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>>
>>
>>
>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>> same key ? I intend to clean up state but I see  NullPointers in
>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>> executed on 2  separate threads, with on Timer removing the state (
>> clear() ) but after another thread has registered a Timer ( in onElement ).
>>
>>
>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>         accumulatorState.clear();
>>     }
>>
>>
>>
>>
>>
>>
>> PS. This is the full code.
>>
>>
>>
>> @Override
>> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>>     TimerService timerService = context.timerService();
>>     if (context.timestamp() > timerService.currentWatermark()) {
>>         OUT accumulator = accumulatorState.value();
>>         if (accumulator == null) {
>>             accumulator = acc.createNew();
>>         }
>>         accumulator.setLastModified(context.timestamp());
>>         accumulatorState.update(accumulator);
>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>     }
>> }
>>
>> @Override
>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>>     OUT accumulator = accumulatorState.value();
>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>>         accumulatorState.clear();
>>     }
>> }
>>
>>
>>
>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> That's correct. Removal of timers is not supported in ProcessFunction.
>>> Not sure why this is supported for Triggers.
>>> The common workaround for ProcessFunctions is to register multiple
>>> timers and have a ValueState that stores the valid timestamp on which the
>>> onTimer method should be executed.
>>> When a timer fires and calls onTimer(), the method first checks whether
>>> the timestamp is the correct one and leaves the method if that is not the
>>> case.
>>> If you want to fire on the next watermark, another trick is to register
>>> multiple timers on (currentWatermark + 1). Since there is only one timer
>>> per timestamp, there is only one timer which gets continuously overwritten.
>>> The timer is called when the watermark is advanced.
>>>
>>> On the performance of the timer service. AFAIK, all methods that work
>>> with some kind of timer use this service. So there is not much choice.
>>>
>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>>> And that further begs the question.. how performant is Timer Service. I
>>>> tried to peruse through the architecture behind it but cold not find a
>>>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>>>
>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>
>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>>> object in onElement() has an effect. The object might have been serialized
>>>>>> and stored in RocksDB.
>>>>>> Hence, elements should not be modified in onElement().
>>>>>>
>>>>>> Have you considered to implement the operation completely in a
>>>>>> ProcessFunction instead of a session window?
>>>>>> This might be more code but easier to design and reason about because
>>>>>> there is no interaction of window assigner, trigger, and window function.
>>>>>>
>>>>>>
>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>> Operator.java#L362
>>>>>>>
>>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>>> seems natural to use WM to materialize a micro batches with "approximate"
>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers as
>>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>>> emission" through a trigger on a Session Window would be very helpful. In
>>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>
>>>>>>> Something like https://github.com/apache
>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>
>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>>>>> the motive is the keep the state lean as we desire to search for  patterns,
>>>>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>>>>> deterministic length ) stream....
>>>>>>>>
>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> For example, this would have worked perfect if it did not complain
>>>>>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>>>>> current WM )  we desire
>>>>>>>>>
>>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>
>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>>
>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>             s.add(e);
>>>>>>>>>         }
>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>>         s.reduce();
>>>>>>>>>         out.collect(s);
>>>>>>>>>         session.update(s);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void clear(Context context){
>>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>>         session.clear();
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>
>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>>> windows.
>>>>>>>>>>
>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>>>> every configured time progression of WM  and a Window implementation that
>>>>>>>>>> materializes *only data up till that WM* ( it might have more
>>>>>>>>>> data but that data has event time grater than the WM ). I am not sure we
>>>>>>>>>> have that built in option and thus was asking for an access the current WM
>>>>>>>>>> for the window operator to allow  us handle the "*only data up
>>>>>>>>>> till that WM" *range retrieval using some  custom data
>>>>>>>>>> structure.
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records
>>>>>>>>>>> with the current watermark before passing them into the window operator.
>>>>>>>>>>> The context object of the processElement() method gives access
>>>>>>>>>>> to the current watermark and timestamp of a record.
>>>>>>>>>>>
>>>>>>>>>>> Please note that watermarks are not deterministic but may depend
>>>>>>>>>>> on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> An addendum
>>>>>>>>>>>>
>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations to
>>>>>>>>>>>> IN in the onElement() is not visible to the Accumulator that is carrying it
>>>>>>>>>>>> as a previous element  reference albeit in the next invocation of add().
>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense only if theses
>>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>>
>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>
>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>
>>>>>>>>>>>>             @Override
>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>             @Override
>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>
>>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>
>>>>>>>>>>>>             }
>>>>>>>>>>>>             .....
>>>>>>>>>>>>
>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>
>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>
>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>
>>>>>>>>>>>>         .
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save
>>>>>>>>>>>>> the POJO  reference in the Accumulator.
>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>
>>>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster,
>>>>>>>>>>>>> as in I have access to the mutation by the onElement() in the POJO in the
>>>>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think
of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks
are received as special records, so the methods are called in the same
order as records (actual records or watermarks) arrive at the function.
Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other
without a new record being processed in between the onTimer() calls. In
that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> Thanks.
>
>
>
>
>     I have a few follow up questions regarding ProcessFunction. I think
> that the core should take care of any synchronization issues between calls
> to onElement and onTimer in case of a keyed stream but tests do not seem to
> suggest that.
>
>
>
> I have  specifically 2 questions.
>
>
> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
> in on a keyed stream, is there a  way that 2 or more threads can execute
> on the more than one element of a single key at one time ? Would I have to
> synchronize this construction
>
>
>
>
> *OUT accumulator = accumulatorState.value();        if (accumulator == null) {            accumulator = acc.createNew();        }*
>
>
>
> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
> same key ? I intend to clean up state but I see  NullPointers in
> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
> executed on 2  separate threads, with on Timer removing the state (
> clear() ) but after another thread has registered a Timer ( in onElement ).
>
>
> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>         accumulatorState.clear();
>     }
>
>
>
>
>
>
> PS. This is the full code.
>
>
>
> @Override
> public  void processElement(IN event, Context context, Collector<OUT> out) throws Exception {
>     TimerService timerService = context.timerService();
>     if (context.timestamp() > timerService.currentWatermark()) {
>         OUT accumulator = accumulatorState.value();
>         if (accumulator == null) {
>             accumulator = acc.createNew();
>         }
>         accumulator.setLastModified(context.timestamp());
>         accumulatorState.update(accumulator);
>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>     }
> }
>
> @Override
> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception {
>     OUT accumulator = accumulatorState.value();
>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions*
>         accumulatorState.clear();
>     }
> }
>
>
>
> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> That's correct. Removal of timers is not supported in ProcessFunction.
>> Not sure why this is supported for Triggers.
>> The common workaround for ProcessFunctions is to register multiple timers
>> and have a ValueState that stores the valid timestamp on which the onTimer
>> method should be executed.
>> When a timer fires and calls onTimer(), the method first checks whether
>> the timestamp is the correct one and leaves the method if that is not the
>> case.
>> If you want to fire on the next watermark, another trick is to register
>> multiple timers on (currentWatermark + 1). Since there is only one timer
>> per timestamp, there is only one timer which gets continuously overwritten.
>> The timer is called when the watermark is advanced.
>>
>> On the performance of the timer service. AFAIK, all methods that work
>> with some kind of timer use this service. So there is not much choice.
>>
>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> And that further begs the question.. how performant is Timer Service. I
>>> tried to peruse through the architecture behind it but cold not find a
>>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>>
>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>>>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>>>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>>>> ?  Trigger Context does expose another version that has removal abilities
>>>> so was wondering why this dissonance...
>>>>
>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>> object in onElement() has an effect. The object might have been serialized
>>>>> and stored in RocksDB.
>>>>> Hence, elements should not be modified in onElement().
>>>>>
>>>>> Have you considered to implement the operation completely in a
>>>>> ProcessFunction instead of a session window?
>>>>> This might be more code but easier to design and reason about because
>>>>> there is no interaction of window assigner, trigger, and window function.
>>>>>
>>>>>
>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vi...@gmail.com>
>>>>> :
>>>>>
>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>> Operator.java#L362
>>>>>>
>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>> seems natural to use WM to materialize a micro batches with "approximate"
>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers as
>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>> emission" through a trigger on a Session Window would be very helpful. In
>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>
>>>>>> Something like https://github.com/apache
>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>
>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>>>> the motive is the keep the state lean as we desire to search for  patterns,
>>>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>>>> deterministic length ) stream....
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> For example, this would have worked perfect if it did not complain
>>>>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>>>> current WM )  we desire
>>>>>>>>
>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>
>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>>
>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>>         for (Event e : elements) {
>>>>>>>>             s.add(e);
>>>>>>>>         }
>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>>         s.reduce();
>>>>>>>>         out.collect(s);
>>>>>>>>         session.update(s);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void clear(Context context){
>>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>>         session.clear();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>
>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>> Operator is what is desired to make deterministic decisions rather than off
>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>>> windows.
>>>>>>>>>
>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>>> every configured time progression of WM  and a Window implementation that
>>>>>>>>> materializes *only data up till that WM* ( it might have more
>>>>>>>>> data but that data has event time grater than the WM ). I am not sure we
>>>>>>>>> have that built in option and thus was asking for an access the current WM
>>>>>>>>> for the window operator to allow  us handle the "*only data up
>>>>>>>>> till that WM" *range retrieval using some  custom data structure.
>>>>>>>>>
>>>>>>>>> Regards.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Vishal,
>>>>>>>>>>
>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>>>>> the current watermark before passing them into the window operator.
>>>>>>>>>> The context object of the processElement() method gives access to
>>>>>>>>>> the current watermark and timestamp of a record.
>>>>>>>>>>
>>>>>>>>>> Please note that watermarks are not deterministic but may depend
>>>>>>>>>> on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> An addendum
>>>>>>>>>>>
>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations to IN
>>>>>>>>>>> in the onElement() is not visible to the Accumulator that is carrying it as
>>>>>>>>>>> a previous element  reference albeit in the next invocation of add(). This
>>>>>>>>>>> seems to be only in distributed mode, which makes sense only if theses
>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>
>>>>>>>>>>> The pipeline
>>>>>>>>>>>
>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>> .aggregate(
>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>
>>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>
>>>>>>>>>>>             }
>>>>>>>>>>>             .....
>>>>>>>>>>>
>>>>>>>>>>>    The Trigger
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>
>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>
>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>
>>>>>>>>>>>         .
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>
>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save
>>>>>>>>>>>> the POJO  reference in the Accumulator.
>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>
>>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>>
>>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster,
>>>>>>>>>>>> as in I have access to the mutation by the onElement() in the POJO in the
>>>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>>>
>>>>>>>>>>>> Regards.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
Thanks.




    I have a few follow up questions regarding ProcessFunction. I think
that the core should take care of any synchronization issues between calls
to onElement and onTimer in case of a keyed stream but tests do not seem to
suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in
on a keyed stream, is there a  way that 2 or more threads can execute on
the more than one element of a single key at one time ? Would I have to
synchronize this construction




*OUT accumulator = accumulatorState.value();        if (accumulator ==
null) {            accumulator = acc.createNew();        }*



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same
key ? I intend to clean up state but I see  NullPointers in OnTimer(..)
thrown and I presume it is b'coz the onElement and onTimer are executed on 2
separate threads, with on Timer removing the state ( clear() ) but after
another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) {* //
NullPointers on Race Conditions*
        accumulatorState.clear();
    }






PS. This is the full code.



@Override
public  void processElement(IN event, Context context, Collector<OUT>
out) throws Exception {
    TimerService timerService = context.timerService();
    if (context.timestamp() > timerService.currentWatermark()) {
        OUT accumulator = accumulatorState.value();
        if (accumulator == null) {
            accumulator = acc.createNew();
        }
        accumulator.setLastModified(context.timestamp());
        accumulatorState.update(accumulator);
        timerService.registerEventTimeTimer(context.timestamp() + gap);
    }
}

@Override
public  void onTimer(long timestamp, OnTimerContext context,
Collector<OUT> out) throws Exception {
    OUT accumulator = accumulatorState.value();
    if (timestamp == accumulator.getLastModified() + gap) {* //
NullPointers on Race Conditions*
        accumulatorState.clear();
    }
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fh...@gmail.com> wrote:

> That's correct. Removal of timers is not supported in ProcessFunction. Not
> sure why this is supported for Triggers.
> The common workaround for ProcessFunctions is to register multiple timers
> and have a ValueState that stores the valid timestamp on which the onTimer
> method should be executed.
> When a timer fires and calls onTimer(), the method first checks whether
> the timestamp is the correct one and leaves the method if that is not the
> case.
> If you want to fire on the next watermark, another trick is to register
> multiple timers on (currentWatermark + 1). Since there is only one timer
> per timestamp, there is only one timer which gets continuously overwritten.
> The timer is called when the watermark is advanced.
>
> On the performance of the timer service. AFAIK, all methods that work with
> some kind of timer use this service. So there is not much choice.
>
> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> And that further begs the question.. how performant is Timer Service. I
>> tried to peruse through the architecture behind it but cold not find a
>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>
>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>>> ?  Trigger Context does expose another version that has removal abilities
>>> so was wondering why this dissonance...
>>>
>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> it is not guaranteed that add() and onElement() receive the same
>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>> object in onElement() has an effect. The object might have been serialized
>>>> and stored in RocksDB.
>>>> Hence, elements should not be modified in onElement().
>>>>
>>>> Have you considered to implement the operation completely in a
>>>> ProcessFunction instead of a session window?
>>>> This might be more code but easier to design and reason about because
>>>> there is no interaction of window assigner, trigger, and window function.
>>>>
>>>>
>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>
>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>> Operator.java#L362
>>>>>
>>>>> is where We could fashion as to what is emitted. Again for us it seems
>>>>> natural to use WM to materialize a micro batches with "approximate" order (
>>>>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>>>>> could write an implementation that allows for "up till WM emission" through
>>>>> a trigger on a Session Window would be very helpful. In essence I believe
>>>>> that for any "funnel" analysis it is crucial.
>>>>>
>>>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>>>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
>>>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin
>>>>> gWindowOperator.java#L346
>>>>>
>>>>> I know I am simplifying this and there has to be more to it...
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>>> the motive is the keep the state lean as we desire to search for  patterns,
>>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>>> deterministic length ) stream....
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> For example, this would have worked perfect if it did not complain
>>>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>>> current WM )  we desire
>>>>>>>
>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>
>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>>
>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>>         for (Event e : elements) {
>>>>>>>             s.add(e);
>>>>>>>         }
>>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>>         s.reduce();
>>>>>>>         out.collect(s);
>>>>>>>         session.update(s);
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void clear(Context context){
>>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>>         session.clear();
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>
>>>>>>>>  I think that does not work, as it is the WM of the Window Operator
>>>>>>>> is what is desired to make deterministic decisions rather than off an
>>>>>>>> operator the precedes the Window ? This is doable using
>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>> windows.
>>>>>>>>
>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>> every configured time progression of WM  and a Window implementation that
>>>>>>>> materializes *only data up till that WM* ( it might have more data
>>>>>>>> but that data has event time grater than the WM ). I am not sure we have
>>>>>>>> that built in option and thus was asking for an access the current WM for
>>>>>>>> the window operator to allow  us handle the "*only data up till
>>>>>>>> that WM" *range retrieval using some  custom data structure.
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> the Trigger is not designed to augment records but just to control
>>>>>>>>> when a window is evaluated.
>>>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>>>> the current watermark before passing them into the window operator.
>>>>>>>>> The context object of the processElement() method gives access to
>>>>>>>>> the current watermark and timestamp of a record.
>>>>>>>>>
>>>>>>>>> Please note that watermarks are not deterministic but may depend
>>>>>>>>> on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>>
>>>>>>>>>> An addendum
>>>>>>>>>>
>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>>>>>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>>>>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>>>>> different objects.
>>>>>>>>>>
>>>>>>>>>> The pipeline
>>>>>>>>>>
>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>> .aggregate(
>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>
>>>>>>>>>>             @Override
>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>                 return newInstance;
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             @Override
>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>
>>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>
>>>>>>>>>>             }
>>>>>>>>>>             .....
>>>>>>>>>>
>>>>>>>>>>    The Trigger
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>
>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>
>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>
>>>>>>>>>>         .
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>
>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>
>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>
>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as
>>>>>>>>>>> in I have access to the mutation by the onElement() in the POJO in the
>>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>>
>>>>>>>>>>> Regards.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Fabian Hueske <fh...@gmail.com>.
That's correct. Removal of timers is not supported in ProcessFunction. Not
sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers
and have a ValueState that stores the valid timestamp on which the onTimer
method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the
timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register
multiple timers on (currentWatermark + 1). Since there is only one timer
per timestamp, there is only one timer which gets continuously overwritten.
The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with
some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> And that further begs the question.. how performant is Timer Service. I
> tried to peruse through the architecture behind it but cold not find a
> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>
> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>> ?  Trigger Context does expose another version that has removal abilities
>> so was wondering why this dissonance...
>>
>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> it is not guaranteed that add() and onElement() receive the same object,
>>> and even if they do it is not guaranteed that a mutation of the object in
>>> onElement() has an effect. The object might have been serialized and stored
>>> in RocksDB.
>>> Hence, elements should not be modified in onElement().
>>>
>>> Have you considered to implement the operation completely in a
>>> ProcessFunction instead of a session window?
>>> This might be more code but easier to design and reason about because
>>> there is no interaction of window assigner, trigger, and window function.
>>>
>>>
>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>> Operator.java#L362
>>>>
>>>> is where We could fashion as to what is emitted. Again for us it seems
>>>> natural to use WM to materialize a micro batches with "approximate" order (
>>>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>>>> could write an implementation that allows for "up till WM emission" through
>>>> a trigger on a Session Window would be very helpful. In essence I believe
>>>> that for any "funnel" analysis it is crucial.
>>>>
>>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
>>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin
>>>> gWindowOperator.java#L346
>>>>
>>>> I know I am simplifying this and there has to be more to it...
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>> the motive is the keep the state lean as we desire to search for  patterns,
>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>> deterministic length ) stream....
>>>>>
>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> For example, this would have worked perfect if it did not complain
>>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>> current WM )  we desire
>>>>>>
>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>
>>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>>
>>>>>>     @Override
>>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>>
>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>>         for (Event e : elements) {
>>>>>>             s.add(e);
>>>>>>         }
>>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>>         s.reduce();
>>>>>>         out.collect(s);
>>>>>>         session.update(s);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void clear(Context context){
>>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>>         session.clear();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>
>>>>>>>  I think that does not work, as it is the WM of the Window Operator
>>>>>>> is what is desired to make deterministic decisions rather than off an
>>>>>>> operator the precedes the Window ? This is doable using
>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>> windows.
>>>>>>>
>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>> every configured time progression of WM  and a Window implementation that
>>>>>>> materializes *only data up till that WM* ( it might have more data
>>>>>>> but that data has event time grater than the WM ). I am not sure we have
>>>>>>> that built in option and thus was asking for an access the current WM for
>>>>>>> the window operator to allow  us handle the "*only data up till
>>>>>>> that WM" *range retrieval using some  custom data structure.
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>>
>>>>>>>> the Trigger is not designed to augment records but just to control
>>>>>>>> when a window is evaluated.
>>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>>> the current watermark before passing them into the window operator.
>>>>>>>> The context object of the processElement() method gives access to
>>>>>>>> the current watermark and timestamp of a record.
>>>>>>>>
>>>>>>>> Please note that watermarks are not deterministic but may depend on
>>>>>>>> the order in which parallel inputs are consumed by an operator.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>>
>>>>>>>>> An addendum
>>>>>>>>>
>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>>>>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>>>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>>>> different objects.
>>>>>>>>>
>>>>>>>>> The pipeline
>>>>>>>>>
>>>>>>>>> .keyBy(keySelector)
>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>> .aggregate(
>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>
>>>>>>>>>             @Override
>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>                 return newInstance;
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             @Override
>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>
>>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>
>>>>>>>>>             }
>>>>>>>>>             .....
>>>>>>>>>
>>>>>>>>>    The Trigger
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>
>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>
>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>
>>>>>>>>>         .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>
>>>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>
>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>
>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as
>>>>>>>>>> in I have access to the mutation by the onElement() in the POJO in the
>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
And that further begs the question.. how performant is Timer Service. I
tried to peruse through the architecture behind it but cold not find a
definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
> exposed by the Context does not have remove timer. Is it primarily b'coz A
> Priority Queue is the storage ad remove from a PriorityQueue is expensive
> ?  Trigger Context does expose another version that has removal abilities
> so was wondering why this dissonance...
>
> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> it is not guaranteed that add() and onElement() receive the same object,
>> and even if they do it is not guaranteed that a mutation of the object in
>> onElement() has an effect. The object might have been serialized and stored
>> in RocksDB.
>> Hence, elements should not be modified in onElement().
>>
>> Have you considered to implement the operation completely in a
>> ProcessFunction instead of a session window?
>> This might be more code but easier to design and reason about because
>> there is no interaction of window assigner, trigger, and window function.
>>
>>
>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>> Operator.java#L362
>>>
>>> is where We could fashion as to what is emitted. Again for us it seems
>>> natural to use WM to materialize a micro batches with "approximate" order (
>>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>>> could write an implementation that allows for "up till WM emission" through
>>> a trigger on a Session Window would be very helpful. In essence I believe
>>> that for any "funnel" analysis it is crucial.
>>>
>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin
>>> gWindowOperator.java#L346
>>>
>>> I know I am simplifying this and there has to be more to it...
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> The Trigger in this case would be some CountBased Trigger.... Again the
>>>> motive is the keep the state lean as we desire to search for  patterns,
>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>> deterministic length ) stream....
>>>>
>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> For example, this would have worked perfect if it did not complain
>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>> current WM )  we desire
>>>>>
>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>
>>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>>
>>>>>     @Override
>>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>>
>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>>         for (Event e : elements) {
>>>>>             s.add(e);
>>>>>         }
>>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>>         s.reduce();
>>>>>         out.collect(s);
>>>>>         session.update(s);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void clear(Context context){
>>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>>         session.clear();
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> Hello Fabian, Thank you for the response.
>>>>>>
>>>>>>  I think that does not work, as it is the WM of the Window Operator
>>>>>> is what is desired to make deterministic decisions rather than off an
>>>>>> operator the precedes the Window ? This is doable using
>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>> windows.
>>>>>>
>>>>>>    The best API  option I think is a TimeBaseTrigger that fires every
>>>>>> configured time progression of WM  and a Window implementation that
>>>>>> materializes *only data up till that WM* ( it might have more data
>>>>>> but that data has event time grater than the WM ). I am not sure we have
>>>>>> that built in option and thus was asking for an access the current WM for
>>>>>> the window operator to allow  us handle the "*only data up till that
>>>>>> WM" *range retrieval using some  custom data structure.
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> the Trigger is not designed to augment records but just to control
>>>>>>> when a window is evaluated.
>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>> the current watermark before passing them into the window operator.
>>>>>>> The context object of the processElement() method gives access to
>>>>>>> the current watermark and timestamp of a record.
>>>>>>>
>>>>>>> Please note that watermarks are not deterministic but may depend on
>>>>>>> the order in which parallel inputs are consumed by an operator.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com>:
>>>>>>>
>>>>>>>> An addendum
>>>>>>>>
>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>>>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>>> different objects.
>>>>>>>>
>>>>>>>> The pipeline
>>>>>>>>
>>>>>>>> .keyBy(keySelector)
>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>> .aggregate(
>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>                 return newInstance;
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>
>>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>>
>>>>>>>>
>>>>>>>>                 accumulator.add(value);
>>>>>>>>
>>>>>>>>             }
>>>>>>>>             .....
>>>>>>>>
>>>>>>>>    The Trigger
>>>>>>>>
>>>>>>>>
>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>>
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>
>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>
>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>
>>>>>>>>         .
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>
>>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>
>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>> mutation done in step 3.
>>>>>>>>>
>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as
>>>>>>>>> in I have access to the mutation by the onElement() in the POJO in the
>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>
>>>>>>>>> Regards.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
Makes sense. Did a first stab at Using ProcessFunction. The TimeService
exposed by the Context does not have remove timer. Is it primarily b'coz A
Priority Queue is the storage ad remove from a PriorityQueue is expensive
?  Trigger Context does expose another version that has removal abilities
so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vishal,
>
> it is not guaranteed that add() and onElement() receive the same object,
> and even if they do it is not guaranteed that a mutation of the object in
> onElement() has an effect. The object might have been serialized and stored
> in RocksDB.
> Hence, elements should not be modified in onElement().
>
> Have you considered to implement the operation completely in a
> ProcessFunction instead of a session window?
> This might be more code but easier to design and reason about because
> there is no interaction of window assigner, trigger, and window function.
>
>
> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Wi
>> ndowOperator.java#L362
>>
>> is where We could fashion as to what is emitted. Again for us it seems
>> natural to use WM to materialize a micro batches with "approximate" order (
>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>> could write an implementation that allows for "up till WM emission" through
>> a trigger on a Session Window would be very helpful. In essence I believe
>> that for any "funnel" analysis it is crucial.
>>
>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Ev
>> ictingWindowOperator.java#L346
>>
>> I know I am simplifying this and there has to be more to it...
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> The Trigger in this case would be some CountBased Trigger.... Again the
>>> motive is the keep the state lean as we desire to search for  patterns,
>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>> deterministic length ) stream....
>>>
>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> For example, this would have worked perfect if it did not complain
>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>> current WM )  we desire
>>>>
>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>
>>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>>
>>>>     @Override
>>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>>
>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>         Session s = session.value() != null ? session.value() : new Session();
>>>>         for (Event e : elements) {
>>>>             s.add(e);
>>>>         }
>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>         s.reduce();
>>>>         out.collect(s);
>>>>         session.update(s);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void clear(Context context){
>>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>>         session.clear();
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> Hello Fabian, Thank you for the response.
>>>>>
>>>>>  I think that does not work, as it is the WM of the Window Operator is
>>>>> what is desired to make deterministic decisions rather than off an operator
>>>>> the precedes the Window ? This is doable using ProcessWindowFunction using
>>>>> state but only in the case of non mergeable windows.
>>>>>
>>>>>    The best API  option I think is a TimeBaseTrigger that fires every
>>>>> configured time progression of WM  and a Window implementation that
>>>>> materializes *only data up till that WM* ( it might have more data
>>>>> but that data has event time grater than the WM ). I am not sure we have
>>>>> that built in option and thus was asking for an access the current WM for
>>>>> the window operator to allow  us handle the "*only data up till that
>>>>> WM" *range retrieval using some  custom data structure.
>>>>>
>>>>> Regards.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> the Trigger is not designed to augment records but just to control
>>>>>> when a window is evaluated.
>>>>>> I would recommend to use a ProcessFunction to enrich records with the
>>>>>> current watermark before passing them into the window operator.
>>>>>> The context object of the processElement() method gives access to the
>>>>>> current watermark and timestamp of a record.
>>>>>>
>>>>>> Please note that watermarks are not deterministic but may depend on
>>>>>> the order in which parallel inputs are consumed by an operator.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> An addendum
>>>>>>>
>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>> different objects.
>>>>>>>
>>>>>>> The pipeline
>>>>>>>
>>>>>>> .keyBy(keySelector)
>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>> .aggregate(
>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public ACC createAccumulator() {
>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>                 newInstance.resetLocal();
>>>>>>>                 return newInstance;
>>>>>>>             }
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>
>>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>>
>>>>>>>
>>>>>>>                 accumulator.add(value);
>>>>>>>
>>>>>>>             }
>>>>>>>             .....
>>>>>>>
>>>>>>>    The Trigger
>>>>>>>
>>>>>>>
>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>
>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>
>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>
>>>>>>>         .
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>>> The sequence of execution is this sequence
>>>>>>>>
>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>> 2. call to onElement on Tigger
>>>>>>>> 3. set watermark to the POJO
>>>>>>>>
>>>>>>>> The next add() method should have the last reference and any
>>>>>>>> mutation done in step 3.
>>>>>>>>
>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in
>>>>>>>> I have access to the mutation by the onElement() in the POJO in the
>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object,
and even if they do it is not guaranteed that a mutation of the object in
onElement() has an effect. The object might have been serialized and stored
in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a
ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there
is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> I guess https://github.com/apache/flink/blob/
> 7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-
> java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/
> WindowOperator.java#L362
>
> is where We could fashion as to what is emitted. Again for us it seems
> natural to use WM to materialize a micro batches with "approximate" order (
> and no I am not a fan of spark micro batches :)). Any pointers as to how we
> could write an implementation that allows for "up till WM emission" through
> a trigger on a Session Window would be very helpful. In essence I believe
> that for any "funnel" analysis it is crucial.
>
> Something like https://github.com/apache/flink/blob/
> 7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-
> java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/
> EvictingWindowOperator.java#L346
>
> I know I am simplifying this and there has to be more to it...
>
>
>
>
> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> The Trigger in this case would be some CountBased Trigger.... Again the
>> motive is the keep the state lean as we desire to search for  patterns,
>> sorted on even time,  in the incoming sessionized ( and thus of un
>> deterministic length ) stream....
>>
>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> For example, this would have worked perfect if it did not complain about
>>> MergeableWindow and state. The Session class in this encapsulates the  trim
>>> up to watermark behavior ( reduce call after telling it the current WM )
>>> we desire
>>>
>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>
>>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>>
>>>     @Override
>>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>>
>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>         Session s = session.value() != null ? session.value() : new Session();
>>>         for (Event e : elements) {
>>>             s.add(e);
>>>         }
>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>         s.reduce();
>>>         out.collect(s);
>>>         session.update(s);
>>>     }
>>>
>>>     @Override
>>>     public void clear(Context context){
>>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>>         session.clear();
>>>     }
>>> }
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Hello Fabian, Thank you for the response.
>>>>
>>>>  I think that does not work, as it is the WM of the Window Operator is
>>>> what is desired to make deterministic decisions rather than off an operator
>>>> the precedes the Window ? This is doable using ProcessWindowFunction using
>>>> state but only in the case of non mergeable windows.
>>>>
>>>>    The best API  option I think is a TimeBaseTrigger that fires every
>>>> configured time progression of WM  and a Window implementation that
>>>> materializes *only data up till that WM* ( it might have more data but
>>>> that data has event time grater than the WM ). I am not sure we have that
>>>> built in option and thus was asking for an access the current WM for the
>>>> window operator to allow  us handle the "*only data up till that WM" *range
>>>> retrieval using some  custom data structure.
>>>>
>>>> Regards.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> the Trigger is not designed to augment records but just to control
>>>>> when a window is evaluated.
>>>>> I would recommend to use a ProcessFunction to enrich records with the
>>>>> current watermark before passing them into the window operator.
>>>>> The context object of the processElement() method gives access to the
>>>>> current watermark and timestamp of a record.
>>>>>
>>>>> Please note that watermarks are not deterministic but may depend on
>>>>> the order in which parallel inputs are consumed by an operator.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vi...@gmail.com>
>>>>> :
>>>>>
>>>>>> An addendum
>>>>>>
>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>> different objects.
>>>>>>
>>>>>> The pipeline
>>>>>>
>>>>>> .keyBy(keySelector)
>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>> .aggregate(
>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>
>>>>>>             @Override
>>>>>>             public ACC createAccumulator() {
>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>                 newInstance.resetLocal();
>>>>>>                 return newInstance;
>>>>>>             }
>>>>>>
>>>>>>             @Override
>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>
>>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>>
>>>>>>
>>>>>>                 accumulator.add(value);
>>>>>>
>>>>>>             }
>>>>>>             .....
>>>>>>
>>>>>>    The Trigger
>>>>>>
>>>>>>
>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>
>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>
>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>
>>>>>>         .
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>> specifically supply the POJO with the watermark from the TriggerContext.
>>>>>>> The sequence of execution is this sequence
>>>>>>>
>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>> POJO  reference in the Accumulator.
>>>>>>> 2. call to onElement on Tigger
>>>>>>> 3. set watermark to the POJO
>>>>>>>
>>>>>>> The next add() method should have the last reference and any
>>>>>>> mutation done in step 3.
>>>>>>>
>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in
>>>>>>> I have access to the mutation by the onElement() in the POJO in the
>>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
I guess
https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems
natural to use WM to materialize a micro batches with "approximate" order (
and no I am not a fan of spark micro batches :)). Any pointers as to how we
could write an implementation that allows for "up till WM emission" through
a trigger on a Session Window would be very helpful. In essence I believe
that for any "funnel" analysis it is crucial.

Something like
https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L346

I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:

> The Trigger in this case would be some CountBased Trigger.... Again the
> motive is the keep the state lean as we desire to search for  patterns,
> sorted on even time,  in the incoming sessionized ( and thus of un
> deterministic length ) stream....
>
> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> For example, this would have worked perfect if it did not complain about
>> MergeableWindow and state. The Session class in this encapsulates the  trim
>> up to watermark behavior ( reduce call after telling it the current WM )
>> we desire
>>
>> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>
>>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>>
>>     @Override
>>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>>
>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>         Session s = session.value() != null ? session.value() : new Session();
>>         for (Event e : elements) {
>>             s.add(e);
>>         }
>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>         s.reduce();
>>         out.collect(s);
>>         session.update(s);
>>     }
>>
>>     @Override
>>     public void clear(Context context){
>>         ValueState<Session> session = context.windowState().getState(sessionState);
>>         session.clear();
>>     }
>> }
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Hello Fabian, Thank you for the response.
>>>
>>>  I think that does not work, as it is the WM of the Window Operator is
>>> what is desired to make deterministic decisions rather than off an operator
>>> the precedes the Window ? This is doable using ProcessWindowFunction using
>>> state but only in the case of non mergeable windows.
>>>
>>>    The best API  option I think is a TimeBaseTrigger that fires every
>>> configured time progression of WM  and a Window implementation that
>>> materializes *only data up till that WM* ( it might have more data but
>>> that data has event time grater than the WM ). I am not sure we have that
>>> built in option and thus was asking for an access the current WM for the
>>> window operator to allow  us handle the "*only data up till that WM" *range
>>> retrieval using some  custom data structure.
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> the Trigger is not designed to augment records but just to control when
>>>> a window is evaluated.
>>>> I would recommend to use a ProcessFunction to enrich records with the
>>>> current watermark before passing them into the window operator.
>>>> The context object of the processElement() method gives access to the
>>>> current watermark and timestamp of a record.
>>>>
>>>> Please note that watermarks are not deterministic but may depend on the
>>>> order in which parallel inputs are consumed by an operator.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>
>>>>> An addendum
>>>>>
>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>>> distributed mode, which makes sense only if theses reference point to
>>>>> different objects.
>>>>>
>>>>> The pipeline
>>>>>
>>>>> .keyBy(keySelector)
>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>> .aggregate(
>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>
>>>>>             @Override
>>>>>             public ACC createAccumulator() {
>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>                 newInstance.resetLocal();
>>>>>                 return newInstance;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public void add(IN value, ACC accumulator) {
>>>>>
>>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>>
>>>>>
>>>>>                 accumulator.add(value);
>>>>>
>>>>>             }
>>>>>             .....
>>>>>
>>>>>    The Trigger
>>>>>
>>>>>
>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>>
>>>>>
>>>>>     @Override
>>>>>
>>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>
>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>
>>>>>         .
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> I want to augment a POJO in  Trigger's onElement method, specifically
>>>>>> supply the POJO with the watermark from the TriggerContext. The sequence of
>>>>>> execution is this sequence
>>>>>>
>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>> POJO  reference in the Accumulator.
>>>>>> 2. call to onElement on Tigger
>>>>>> 3. set watermark to the POJO
>>>>>>
>>>>>> The next add() method should have the last reference and any mutation
>>>>>> done in step 3.
>>>>>>
>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>>>>>> have access to the mutation by the onElement() in the POJO in the
>>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>> causes these divergence ( local versus distributed )
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
The Trigger in this case would be some CountBased Trigger.... Again the
motive is the keep the state lean as we desire to search for  patterns,
sorted on even time,  in the incoming sessionized ( and thus of un
deterministic length ) stream....

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:

> For example, this would have worked perfect if it did not complain about
> MergeableWindow and state. The Session class in this encapsulates the  trim
> up to watermark behavior ( reduce call after telling it the current WM )
> we desire
>
> public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> {
>
>     private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class);
>
>     @Override
>     public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception {
>
>         ValueState<Session> session = context.windowState().getState(sessionState);
>         Session s = session.value() != null ? session.value() : new Session();
>         for (Event e : elements) {
>             s.add(e);
>         }
>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>         s.reduce();
>         out.collect(s);
>         session.update(s);
>     }
>
>     @Override
>     public void clear(Context context){
>         ValueState<Session> session = context.windowState().getState(sessionState);
>         session.clear();
>     }
> }
>
>
>
>
> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> Hello Fabian, Thank you for the response.
>>
>>  I think that does not work, as it is the WM of the Window Operator is
>> what is desired to make deterministic decisions rather than off an operator
>> the precedes the Window ? This is doable using ProcessWindowFunction using
>> state but only in the case of non mergeable windows.
>>
>>    The best API  option I think is a TimeBaseTrigger that fires every
>> configured time progression of WM  and a Window implementation that
>> materializes *only data up till that WM* ( it might have more data but
>> that data has event time grater than the WM ). I am not sure we have that
>> built in option and thus was asking for an access the current WM for the
>> window operator to allow  us handle the "*only data up till that WM" *range
>> retrieval using some  custom data structure.
>>
>> Regards.
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> the Trigger is not designed to augment records but just to control when
>>> a window is evaluated.
>>> I would recommend to use a ProcessFunction to enrich records with the
>>> current watermark before passing them into the window operator.
>>> The context object of the processElement() method gives access to the
>>> current watermark and timestamp of a record.
>>>
>>> Please note that watermarks are not deterministic but may depend on the
>>> order in which parallel inputs are consumed by an operator.
>>>
>>> Best, Fabian
>>>
>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>>> An addendum
>>>>
>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>>> not visible to the Accumulator that is carrying it as a previous element
>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>> distributed mode, which makes sense only if theses reference point to
>>>> different objects.
>>>>
>>>> The pipeline
>>>>
>>>> .keyBy(keySelector)
>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>> .aggregate(
>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>
>>>>             @Override
>>>>             public ACC createAccumulator() {
>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>                 newInstance.resetLocal();
>>>>                 return newInstance;
>>>>             }
>>>>
>>>>             @Override
>>>>             public void add(IN value, ACC accumulator) {
>>>>
>>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>>
>>>>
>>>>                 accumulator.add(value);
>>>>
>>>>             }
>>>>             .....
>>>>
>>>>    The Trigger
>>>>
>>>>
>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>>
>>>>
>>>>     @Override
>>>>
>>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>
>>>>         /** The element T is mutated to carry the watermark **/
>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>
>>>>         .
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> I want to augment a POJO in  Trigger's onElement method, specifically
>>>>> supply the POJO with the watermark from the TriggerContext. The sequence of
>>>>> execution is this sequence
>>>>>
>>>>> 1. call to add() in the accumulator for the window  and save the POJO
>>>>> reference in the Accumulator.
>>>>> 2. call to onElement on Tigger
>>>>> 3. set watermark to the POJO
>>>>>
>>>>> The next add() method should have the last reference and any mutation
>>>>> done in step 3.
>>>>>
>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>>>>> have access to the mutation by the onElement() in the POJO in the
>>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>> onElement() method of the trigger on that window are inline executions, on
>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>> causes these divergence ( local versus distributed )
>>>>>
>>>>> Regards.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
For example, this would have worked perfect if it did not complain about
MergeableWindow and state. The Session class in this encapsulates the  trim
up to watermark behavior ( reduce call after telling it the current WM )
we desire

public class SessionProcessWindow extends ProcessWindowFunction<Event,
Session, String, TimeWindow> {

    private static final ValueStateDescriptor<Session> sessionState =
new ValueStateDescriptor<>("session", Session.class);

    @Override
    public void process(String key, Context context, Iterable<Event>
elements, Collector<Session> out) throws Exception {

        ValueState<Session> session =
context.windowState().getState(sessionState);
        Session s = session.value() != null ? session.value() : new Session();
        for (Event e : elements) {
            s.add(e);
        }
        s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
        s.reduce();
        out.collect(s);
        session.update(s);
    }

    @Override
    public void clear(Context context){
        ValueState<Session> session =
context.windowState().getState(sessionState);
        session.clear();
    }
}




On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:

> Hello Fabian, Thank you for the response.
>
>  I think that does not work, as it is the WM of the Window Operator is
> what is desired to make deterministic decisions rather than off an operator
> the precedes the Window ? This is doable using ProcessWindowFunction using
> state but only in the case of non mergeable windows.
>
>    The best API  option I think is a TimeBaseTrigger that fires every
> configured time progression of WM  and a Window implementation that
> materializes *only data up till that WM* ( it might have more data but
> that data has event time grater than the WM ). I am not sure we have that
> built in option and thus was asking for an access the current WM for the
> window operator to allow  us handle the "*only data up till that WM" *range
> retrieval using some  custom data structure.
>
> Regards.
>
>
>
>
> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> the Trigger is not designed to augment records but just to control when a
>> window is evaluated.
>> I would recommend to use a ProcessFunction to enrich records with the
>> current watermark before passing them into the window operator.
>> The context object of the processElement() method gives access to the
>> current watermark and timestamp of a record.
>>
>> Please note that watermarks are not deterministic but may depend on the
>> order in which parallel inputs are consumed by an operator.
>>
>> Best, Fabian
>>
>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> An addendum
>>>
>>> Is the element reference IN  in onElement(IN element.. ) in
>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>> not visible to the Accumulator that is carrying it as a previous element
>>> reference albeit in the next invocation of add(). This seems to be only in
>>> distributed mode, which makes sense only if theses reference point to
>>> different objects.
>>>
>>> The pipeline
>>>
>>> .keyBy(keySelector)
>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>> .aggregate(
>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>
>>>             @Override
>>>             public ACC createAccumulator() {
>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>                 newInstance.resetLocal();
>>>                 return newInstance;
>>>             }
>>>
>>>             @Override
>>>             public void add(IN value, ACC accumulator) {
>>>
>>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>>
>>>
>>>                 accumulator.add(value);
>>>
>>>             }
>>>             .....
>>>
>>>    The Trigger
>>>
>>>
>>> public class CountBasedWMAugmentationTrigger<T extends
>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>>
>>>
>>>     @Override
>>>
>>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>
>>>         /** The element T is mutated to carry the watermark **/
>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>
>>>         .
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> I want to augment a POJO in  Trigger's onElement method, specifically
>>>> supply the POJO with the watermark from the TriggerContext. The sequence of
>>>> execution is this sequence
>>>>
>>>> 1. call to add() in the accumulator for the window  and save the POJO
>>>> reference in the Accumulator.
>>>> 2. call to onElement on Tigger
>>>> 3. set watermark to the POJO
>>>>
>>>> The next add() method should have the last reference and any mutation
>>>> done in step 3.
>>>>
>>>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>>>> have access to the mutation by the onElement() in the POJO in the
>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>> I had is whether  add() on a supplied accumulator on a window and
>>>> onElement() method of the trigger on that window are inline executions, on
>>>> the same thread or is there any serialization/deserialization IPC that
>>>> causes these divergence ( local versus distributed )
>>>>
>>>> Regards.
>>>>
>>>
>>>
>>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
Hello Fabian, Thank you for the response.

 I think that does not work, as it is the WM of the Window Operator is what
is desired to make deterministic decisions rather than off an operator the
precedes the Window ? This is doable using ProcessWindowFunction using
state but only in the case of non mergeable windows.

   The best API  option I think is a TimeBaseTrigger that fires every
configured time progression of WM  and a Window implementation that
materializes *only data up till that WM* ( it might have more data but that
data has event time grater than the WM ). I am not sure we have that built
in option and thus was asking for an access the current WM for the window
operator to allow  us handle the "*only data up till that WM" *range
retrieval using some  custom data structure.

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vishal,
>
> the Trigger is not designed to augment records but just to control when a
> window is evaluated.
> I would recommend to use a ProcessFunction to enrich records with the
> current watermark before passing them into the window operator.
> The context object of the processElement() method gives access to the
> current watermark and timestamp of a record.
>
> Please note that watermarks are not deterministic but may depend on the
> order in which parallel inputs are consumed by an operator.
>
> Best, Fabian
>
> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> An addendum
>>
>> Is the element reference IN  in onElement(IN element.. ) in
>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>> not visible to the Accumulator that is carrying it as a previous element
>> reference albeit in the next invocation of add(). This seems to be only in
>> distributed mode, which makes sense only if theses reference point to
>> different objects.
>>
>> The pipeline
>>
>> .keyBy(keySelector)
>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>> .aggregate(
>>         new AggregateFunction<IN, ACC, OUT>() {
>>
>>             @Override
>>             public ACC createAccumulator() {
>>                 ACC newInstance = (ACC) accumulator.clone();
>>                 newInstance.resetLocal();
>>                 return newInstance;
>>             }
>>
>>             @Override
>>             public void add(IN value, ACC accumulator) {
>>
>>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>>
>>
>>                 accumulator.add(value);
>>
>>             }
>>             .....
>>
>>    The Trigger
>>
>>
>> public class CountBasedWMAugmentationTrigger<T extends
>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>>
>>
>>     @Override
>>
>>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>
>>         /** The element T is mutated to carry the watermark **/
>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>
>>         .
>>
>>
>>
>>
>>
>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> I want to augment a POJO in  Trigger's onElement method, specifically
>>> supply the POJO with the watermark from the TriggerContext. The sequence of
>>> execution is this sequence
>>>
>>> 1. call to add() in the accumulator for the window  and save the POJO
>>> reference in the Accumulator.
>>> 2. call to onElement on Tigger
>>> 3. set watermark to the POJO
>>>
>>> The next add() method should have the last reference and any mutation
>>> done in step 3.
>>>
>>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>>> have access to the mutation by the onElement() in the POJO in the
>>> subsequent add(),  but not on a distributed cluster. The specific question
>>> I had is whether  add() on a supplied accumulator on a window and
>>> onElement() method of the trigger on that window are inline executions, on
>>> the same thread or is there any serialization/deserialization IPC that
>>> causes these divergence ( local versus distributed )
>>>
>>> Regards.
>>>
>>
>>
>

Re: A question about Triggers

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,

the Trigger is not designed to augment records but just to control when a
window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the
current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the
current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the
order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> An addendum
>
> Is the element reference IN  in onElement(IN element.. ) in
> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
> not visible to the Accumulator that is carrying it as a previous element
> reference albeit in the next invocation of add(). This seems to be only in
> distributed mode, which makes sense only if theses reference point to
> different objects.
>
> The pipeline
>
> .keyBy(keySelector)
> .window(EventTimeSessionWindows.<IN>withGap(gap))
> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
> .aggregate(
>         new AggregateFunction<IN, ACC, OUT>() {
>
>             @Override
>             public ACC createAccumulator() {
>                 ACC newInstance = (ACC) accumulator.clone();
>                 newInstance.resetLocal();
>                 return newInstance;
>             }
>
>             @Override
>             public void add(IN value, ACC accumulator) {
>
>                 /** This method is called before onElement of the Trigger and keeps the reference to the last IN **/
>
>
>                 accumulator.add(value);
>
>             }
>             .....
>
>    The Trigger
>
>
> public class CountBasedWMAugmentationTrigger<T extends
>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> extends Trigger<T, W> {
>
>
>     @Override
>
>     public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>
>         /** The element T is mutated to carry the watermark **/
>         *element.setWaterMark(ctx.getCurrentWatermark());*
>
>         .
>
>
>
>
>
> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> I want to augment a POJO in  Trigger's onElement method, specifically
>> supply the POJO with the watermark from the TriggerContext. The sequence of
>> execution is this sequence
>>
>> 1. call to add() in the accumulator for the window  and save the POJO
>> reference in the Accumulator.
>> 2. call to onElement on Tigger
>> 3. set watermark to the POJO
>>
>> The next add() method should have the last reference and any mutation
>> done in step 3.
>>
>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>> have access to the mutation by the onElement() in the POJO in the
>> subsequent add(),  but not on a distributed cluster. The specific question
>> I had is whether  add() on a supplied accumulator on a window and
>> onElement() method of the trigger on that window are inline executions, on
>> the same thread or is there any serialization/deserialization IPC that
>> causes these divergence ( local versus distributed )
>>
>> Regards.
>>
>
>

Re: A question about Triggers

Posted by Vishal Santoshi <vi...@gmail.com>.
An addendum

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>,
the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It
seems that any mutations to IN in the onElement() is not visible to the
Accumulator that is carrying it as a previous element  reference albeit in
the next invocation of add(). This seems to be only in distributed mode,
which makes sense only if theses reference point to different objects.

The pipeline

.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
        new AggregateFunction<IN, ACC, OUT>() {

            @Override
            public ACC createAccumulator() {
                ACC newInstance = (ACC) accumulator.clone();
                newInstance.resetLocal();
                return newInstance;
            }

            @Override
            public void add(IN value, ACC accumulator) {

                /** This method is called before onElement of the
Trigger and keeps the reference to the last IN **/


                accumulator.add(value);

            }
            .....

   The Trigger


public class CountBasedWMAugmentationTrigger<T extends
        Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W
extends Window> extends Trigger<T, W> {


    @Override

    public TriggerResult onElement(T element, long timestamp, W
window, TriggerContext ctx) throws Exception {

        /** The element T is mutated to carry the watermark **/
        *element.setWaterMark(ctx.getCurrentWatermark());*

        .





On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <vi...@gmail.com>
wrote:

> I want to augment a POJO in  Trigger's onElement method, specifically
> supply the POJO with the watermark from the TriggerContext. The sequence of
> execution is this sequence
>
> 1. call to add() in the accumulator for the window  and save the POJO
> reference in the Accumulator.
> 2. call to onElement on Tigger
> 3. set watermark to the POJO
>
> The next add() method should have the last reference and any mutation done
> in step 3.
>
> That works in a local test case, using LocalFlinkMiniCluster, as in I have
> access to the mutation by the onElement() in the POJO in the subsequent
> add(),  but not on a distributed cluster. The specific question I had is
> whether  add() on a supplied accumulator on a window and onElement() method
> of the trigger on that window are inline executions, on the same thread or
> is there any serialization/deserialization IPC that causes these divergence
> ( local versus distributed )
>
> Regards.
>