You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2017/11/08 20:43:24 UTC

How to best create a bounded session window ?

I am implementing a bounded session window but I require to short circuit
the session if the session length ( in count of events or time ) go beyond
a configured limit , a very reasonable scenario ( bot etc ) . I am using
the approach as listed. I am not sure though if the Window itself is being
terminated and if that is even feasible. Any other approach or advise ?

public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
    long maxSessionTime;

    ValueState<Boolean> doneState;
    private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
            new ValueStateDescriptor<>("done", Boolean.class );

    private BoundedEventTimeTrigger(long maxSessionTime) {
        this.maxSessionTime = maxSessionTime;
    }

    /**
     * Creates an event-time trigger that fires once the watermark
passes the end of the window.
     * <p>
     * <p>Once the trigger fires all elements are discarded. Elements
that arrive late immediately
     * trigger window evaluation with just this one element.
     */
    public static BoundedEventTimeTrigger create(long maxSessionLengh) {
        return new BoundedEventTimeTrigger(maxSessionLengh);
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
        if(cleanupState!=null && cleanupState.value()!=null &&
cleanupState.value()) {
            return TriggerResult.CONTINUE;
        }
        if(timestamp - window.getStart() > maxSessionTime){
            System.out.println(new Date(timestamp) + "\t" + new
Date(window.getStart()));
            try {
                doneState = ctx.getPartitionedState(cleanupStateDescriptor);
                doneState.update(true);
                return TriggerResult.FIRE_AND_PURGE;
            } catch (IOException e) {
                throw new RuntimeException("Failed to update state", e);
            }
        }

        if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        ctx.registerEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

}

Re: How to best create a bounded session window ?

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Indeed you are unfortunately right. Triggers do not define/control lifecycle of the window, so it could happen that each new event is constantly pushing the leading boundary of the window, while your custom trigger is constantly triggering and purging this single EVENT (because exceeded max window length). So probably my example of events:
1 2 3 4 6 7 8 5
Would result in following fired windows:
[1 2 3 4 6] (window time from 1 to 6) +
[7] - (window time from 1 to 7) +
[8] - (window time from 1 to 8) +
[5] - (window time from 1 to 8)

I’m not sure if you can walk around this issue. You would have to either implement your custom WindowOperator that behaves differently or you could copy the code and add new TriggerResult - FIRE_PURGE_AND_DROP_WINDOW. The later one maybe could be contributed back into Flink (should be discussed in some ticket before).

Piotrek

> On 9 Nov 2017, at 15:27, Vishal Santoshi <vi...@gmail.com> wrote:
> 
> Thanks you for the response. 
> 
>         I would not mind the second scenario as in a second window, which your illustration suggests with a custom trigger approach, I am not certain  though that triggers  define the lifecycle of a window, as in a trigger firing does not necessarily imply a Garbage Collectable Window.  It should be GCed only after the watermark exceeds a hypothetically ever increasing window leading boundary by a lag. In a some case that might never happen as in the leading boundary is forever increasing. We may decide to fire_and_purge. fire etc but the window remains live.  Or did I get that part wrong ? 
> 
> 
> Vishal.
> 
> 
> 
> 
> On Thu, Nov 9, 2017 at 8:24 AM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> It might be more complicated if you want to take into account events coming in out of order. For example you limit length of window to 5 and you get the following events:
> 
> 1 2 3 4 6 7 8 5
> 
> Do you want to emit windows:
> 
> [1 2 3 4 5] (length limit exceeded) + [6 7 8] ?
> 
> Or are you fine with interleaving windows in case of out of order:
> 
> [1 2 3 4 6] + [5 7 8] 
> 
> If the latter one, some custom Trigger should be enough for you. If not, you would need to implement hypothetical MergingAndSplitableWindowAssigner, that after encountering late event “5” could split previously created windows. Unfortunately such feature is not supported by a WindowOperator, so you would have to implement your own operator for this.
> 
> Regardless of your option remember to write some integration tests:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing>
> 
> Piotrek
> 
>> On 8 Nov 2017, at 21:43, Vishal Santoshi <vishal.santoshi@gmail.com <ma...@gmail.com>> wrote:
>> 
>> I am implementing a bounded session window but I require to short circuit the session if the session length ( in count of events or time ) go beyond a configured limit , a very reasonable scenario ( bot etc ) . I am using the approach as listed. I am not sure though if the Window itself is being terminated and if that is even feasible. Any other approach or advise ?  
>> 
>> public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
>>     private static final long serialVersionUID = 1L;
>>     long maxSessionTime;
>> 
>>     ValueState<Boolean> doneState;
>>     private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
>>             new ValueStateDescriptor<>("done", Boolean.class );
>> 
>>     private BoundedEventTimeTrigger(long maxSessionTime) {
>>         this.maxSessionTime = maxSessionTime;
>>     }
>> 
>>     /**
>>      * Creates an event-time trigger that fires once the watermark passes the end of the window.
>>      * <p>
>>      * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
>>      * trigger window evaluation with just this one element.
>>      */
>>     public static BoundedEventTimeTrigger create(long maxSessionLengh) {
>>         return new BoundedEventTimeTrigger(maxSessionLengh);
>>     }
>> 
>>     @Override
>>     public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
>>         if(cleanupState!=null && cleanupState.value()!=null && cleanupState.value()) {
>>             return TriggerResult.CONTINUE;
>>         }
>>         if(timestamp - window.getStart() > maxSessionTime){
>>             System.out.println(new Date(timestamp) + "\t" + new Date(window.getStart()));
>>             try {
>>                 doneState = ctx.getPartitionedState(cleanupStateDescriptor);
>>                 doneState.update(true);
>>                 return TriggerResult.FIRE_AND_PURGE;
>>             } catch (IOException e) {
>>                 throw new RuntimeException("Failed to update state", e);
>>             }
>>         }
>> 
>>         if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
>>             // if the watermark is already past the window fire immediately
>>             return TriggerResult.FIRE;
>>         } else {
>>             ctx.registerEventTimeTimer(window.maxTimestamp());
>>             return TriggerResult.CONTINUE;
>>         }
>>     }
>> 
>>     @Override
>>     public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>>         return time == window.maxTimestamp() ?
>>                 TriggerResult.FIRE :
>>                 TriggerResult.CONTINUE;
>>     }
>> 
>>     @Override
>>     public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
>>         return TriggerResult.CONTINUE;
>>     }
>> 
>>     @Override
>>     public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
>>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>>     }
>> 
>>     @Override
>>     public boolean canMerge() {
>>         return true;
>>     }
>> 
>>     @Override
>>     public void onMerge(TimeWindow window,
>>                         OnMergeContext ctx) {
>>         ctx.registerEventTimeTimer(window.maxTimestamp());
>>     }
>> 
>>     @Override
>>     public String toString() {
>>         return "EventTimeTrigger()";
>>     }
>> }
>> 
> 
> 


Re: How to best create a bounded session window ?

Posted by Vishal Santoshi <vi...@gmail.com>.
Thanks you for the response.

        I would not mind the second scenario as in a second window, which
your illustration suggests with a custom trigger approach, I am not
certain  though that triggers  define the lifecycle of a window, as in a
trigger firing does not necessarily imply a Garbage Collectable Window.  It
should be GCed only after the watermark exceeds a hypothetically ever
increasing window leading boundary by a lag. In a some case that might
never happen as in the leading boundary is forever increasing. We may
decide to fire_and_purge. fire etc but the window remains live.  Or did I
get that part wrong ?


Vishal.




On Thu, Nov 9, 2017 at 8:24 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> It might be more complicated if you want to take into account events
> coming in out of order. For example you limit length of window to 5 and you
> get the following events:
>
> 1 2 3 4 6 7 8 5
>
> Do you want to emit windows:
>
> [1 2 3 4 5] (length limit exceeded) + [6 7 8] ?
>
> Or are you fine with interleaving windows in case of out of order:
>
> [1 2 3 4 6] + [5 7 8]
>
> If the latter one, some custom Trigger should be enough for you. If not,
> you would need to implement hypothetical MergingAndSplitableWindowAssigner,
> that after encountering late event “5” could split previously created
> windows. Unfortunately such feature is not supported by a WindowOperator,
> so you would have to implement your own operator for this.
>
> Regardless of your option remember to write some integration tests:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/
> testing.html#integration-testing
>
> Piotrek
>
> On 8 Nov 2017, at 21:43, Vishal Santoshi <vi...@gmail.com>
> wrote:
>
> I am implementing a bounded session window but I require to short circuit
> the session if the session length ( in count of events or time ) go beyond
> a configured limit , a very reasonable scenario ( bot etc ) . I am using
> the approach as listed. I am not sure though if the Window itself is being
> terminated and if that is even feasible. Any other approach or advise ?
>
> public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     long maxSessionTime;
>
>     ValueState<Boolean> doneState;
>     private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
>             new ValueStateDescriptor<>("done", Boolean.class );
>
>     private BoundedEventTimeTrigger(long maxSessionTime) {
>         this.maxSessionTime = maxSessionTime;
>     }
>
>     /**
>      * Creates an event-time trigger that fires once the watermark passes the end of the window.
>      * <p>
>      * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
>      * trigger window evaluation with just this one element.
>      */
>     public static BoundedEventTimeTrigger create(long maxSessionLengh) {
>         return new BoundedEventTimeTrigger(maxSessionLengh);
>     }
>
>     @Override
>     public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
>         if(cleanupState!=null && cleanupState.value()!=null && cleanupState.value()) {
>             return TriggerResult.CONTINUE;
>         }
>         if(timestamp - window.getStart() > maxSessionTime){
>             System.out.println(new Date(timestamp) + "\t" + new Date(window.getStart()));
>             try {
>                 doneState = ctx.getPartitionedState(cleanupStateDescriptor);
>                 doneState.update(true);
>                 return TriggerResult.FIRE_AND_PURGE;
>             } catch (IOException e) {
>                 throw new RuntimeException("Failed to update state", e);
>             }
>         }
>
>         if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
>             // if the watermark is already past the window fire immediately
>             return TriggerResult.FIRE;
>         } else {
>             ctx.registerEventTimeTimer(window.maxTimestamp());
>             return TriggerResult.CONTINUE;
>         }
>     }
>
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>         return time == window.maxTimestamp() ?
>                 TriggerResult.FIRE :
>                 TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>     }
>
>     @Override
>     public boolean canMerge() {
>         return true;
>     }
>
>     @Override
>     public void onMerge(TimeWindow window,
>                         OnMergeContext ctx) {
>         ctx.registerEventTimeTimer(window.maxTimestamp());
>     }
>
>     @Override
>     public String toString() {
>         return "EventTimeTrigger()";
>     }
>
> }
>
>
>
>

Re: How to best create a bounded session window ?

Posted by Piotr Nowojski <pi...@data-artisans.com>.
It might be more complicated if you want to take into account events coming in out of order. For example you limit length of window to 5 and you get the following events:

1 2 3 4 6 7 8 5

Do you want to emit windows:

[1 2 3 4 5] (length limit exceeded) + [6 7 8] ?

Or are you fine with interleaving windows in case of out of order:

[1 2 3 4 6] + [5 7 8] 

If the latter one, some custom Trigger should be enough for you. If not, you would need to implement hypothetical MergingAndSplitableWindowAssigner, that after encountering late event “5” could split previously created windows. Unfortunately such feature is not supported by a WindowOperator, so you would have to implement your own operator for this.

Regardless of your option remember to write some integration tests:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing>

Piotrek

> On 8 Nov 2017, at 21:43, Vishal Santoshi <vi...@gmail.com> wrote:
> 
> I am implementing a bounded session window but I require to short circuit the session if the session length ( in count of events or time ) go beyond a configured limit , a very reasonable scenario ( bot etc ) . I am using the approach as listed. I am not sure though if the Window itself is being terminated and if that is even feasible. Any other approach or advise ?  
> 
> public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     long maxSessionTime;
> 
>     ValueState<Boolean> doneState;
>     private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
>             new ValueStateDescriptor<>("done", Boolean.class );
> 
>     private BoundedEventTimeTrigger(long maxSessionTime) {
>         this.maxSessionTime = maxSessionTime;
>     }
> 
>     /**
>      * Creates an event-time trigger that fires once the watermark passes the end of the window.
>      * <p>
>      * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
>      * trigger window evaluation with just this one element.
>      */
>     public static BoundedEventTimeTrigger create(long maxSessionLengh) {
>         return new BoundedEventTimeTrigger(maxSessionLengh);
>     }
> 
>     @Override
>     public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
>         if(cleanupState!=null && cleanupState.value()!=null && cleanupState.value()) {
>             return TriggerResult.CONTINUE;
>         }
>         if(timestamp - window.getStart() > maxSessionTime){
>             System.out.println(new Date(timestamp) + "\t" + new Date(window.getStart()));
>             try {
>                 doneState = ctx.getPartitionedState(cleanupStateDescriptor);
>                 doneState.update(true);
>                 return TriggerResult.FIRE_AND_PURGE;
>             } catch (IOException e) {
>                 throw new RuntimeException("Failed to update state", e);
>             }
>         }
> 
>         if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
>             // if the watermark is already past the window fire immediately
>             return TriggerResult.FIRE;
>         } else {
>             ctx.registerEventTimeTimer(window.maxTimestamp());
>             return TriggerResult.CONTINUE;
>         }
>     }
> 
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>         return time == window.maxTimestamp() ?
>                 TriggerResult.FIRE :
>                 TriggerResult.CONTINUE;
>     }
> 
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
> 
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>     }
> 
>     @Override
>     public boolean canMerge() {
>         return true;
>     }
> 
>     @Override
>     public void onMerge(TimeWindow window,
>                         OnMergeContext ctx) {
>         ctx.registerEventTimeTimer(window.maxTimestamp());
>     }
> 
>     @Override
>     public String toString() {
>         return "EventTimeTrigger()";
>     }
> }
>