You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Abdul Salam Shaikh <ab...@gmail.com> on 2017/01/22 22:07:53 UTC

Expected behaviour of windows

Hi,

I needed some clarity on the behaviour of the windows for my use case.
I have defined my stream as follows:

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
*          env.setParallelism(1);*
        DataStream<String> live = env.addSource(new JsonTestSource());
        DataStream<FlatObject> jsonToTuple = live.flatMap(new Splitter());

        KeyedStream<FlatObject, String> keyStream =  jsonToTuple.keyBy(new
KeySelector<FlatObject,String>() {
        public String getKey(FlatObject value) throws Exception {
            return value.getIntersectionName();
        }
        });

        DataStream<FlatObject> flatStream =
keyStream.window(GlobalWindows.create())

.trigger(new WindowCustomTrigger())

.apply(new TrafficWindow());
        flatStream.print();

*For a given set of Json Objects(Ideal case): *

{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)
{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*5*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)

*In my current program, the output is as the following (All the objects
from the previous window are a part of the next window and it keeps on
adding up to the next): *

{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)
{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*5*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)


I am not sure if this is what is the expected behaviour of the windows. Is
there anything which I can do to get my program working to the ideal case(I
mentioned above).

Thanks in anticipation!

Re: Expected behaviour of windows

Posted by Abdul Salam Shaikh <ab...@gmail.com>.
It was an exception because I had missed the clear() function within my
CustomTrigger.

It works as expected now. Thanks for all the help :)

On Tue, Jan 24, 2017 at 12:23 AM, Abdul Salam Shaikh <
abd.salam.shaikh@gmail.com> wrote:

> This is my definiton of the trigger for more clarity into the issue I am
> running:
>
> @Override
>     public TriggerResult onElement(FlatObject t, long l, Window w,
> TriggerContext tc) throws Exception {
>         long currentTimeInCycle = t.getCurrentTimeInCycle();
>         if (lastKnownCurrentTimeInCycle > currentTimeInCycle) {
>             lastKnownCurrentTimeInCycle = 0;
>             return TriggerResult.FIRE_AND_PURGE;
>         }
>         lastKnownCurrentTimeInCycle = currentTimeInCycle;
>         return TriggerResult.CONTINUE;
>     }
>
> On Mon, Jan 23, 2017 at 10:02 PM, Abdul Salam Shaikh <
> abd.salam.shaikh@gmail.com> wrote:
>
>> Thank you Jonas, I am using version *1.2-SNAPSHOT* of Apache Flink to
>> leverage the advanced Evictor class.
>>
>> However, while trying to use FIRE_AND_PURGE I am getting the following
>> error:
>>
>> java.lang.UnsupportedOperationException: Not supported yet.
>> at de.traffic.ui.streaming.WindowCustomTrigger.clear(WindowCust
>> omTrigger.java:51)
>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator$Context.clear(WindowOperator.java:643)
>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.cleanup(WindowOperator.java:421)
>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:321)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:185)
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:63)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:269)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Do we have support for this functionality in *1.2-SNAPSHOT* ?
>>
>> Thanks.
>>
>> On Mon, Jan 23, 2017 at 10:57 AM, Jonas <jo...@huntun.de> wrote:
>>
>>> The documentation says
>>>
>>> https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
>>> > On each event, a trigger can decide to fire (i.e., evaluate), purge
>>> > (remove the window and discard its content), or fire and then purge the
>>> > window. A trigger that just fires evaluates the window and keeps it as
>>> it
>>> > is, i.e., all elements remain in the window and are evaluated again
>>> when
>>> > the triggers fires the next time.
>>>
>>> So you can choose between *Fire*, *Purge* and *Fire&Purge*. Seems like
>>> selected *Fire* but meant to choose *Fire&Purge*.
>>>
>>> From what it seems you want a PurgingTrigger. You also didn't state what
>>> version of Flink you are using :)
>>>
>>> -- Jonas
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Expected-behaviour-of-
>>> windows-tp11200p11205.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>>
>> --
>> Thanks & Regards,
>>
>> *Abdul Salam Shaikh*
>>
>>
>
>
> --
> Thanks & Regards,
>
> *Abdul Salam Shaikh*
>
>


-- 
Thanks & Regards,

*Abdul Salam Shaikh*

Re: Expected behaviour of windows

Posted by Abdul Salam Shaikh <ab...@gmail.com>.
This is my definiton of the trigger for more clarity into the issue I am
running:

@Override
    public TriggerResult onElement(FlatObject t, long l, Window w,
TriggerContext tc) throws Exception {
        long currentTimeInCycle = t.getCurrentTimeInCycle();
        if (lastKnownCurrentTimeInCycle > currentTimeInCycle) {
            lastKnownCurrentTimeInCycle = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        lastKnownCurrentTimeInCycle = currentTimeInCycle;
        return TriggerResult.CONTINUE;
    }

On Mon, Jan 23, 2017 at 10:02 PM, Abdul Salam Shaikh <
abd.salam.shaikh@gmail.com> wrote:

> Thank you Jonas, I am using version *1.2-SNAPSHOT* of Apache Flink to
> leverage the advanced Evictor class.
>
> However, while trying to use FIRE_AND_PURGE I am getting the following
> error:
>
> java.lang.UnsupportedOperationException: Not supported yet.
> at de.traffic.ui.streaming.WindowCustomTrigger.clear(
> WindowCustomTrigger.java:51)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$Context.clear(WindowOperator.java:643)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.cleanup(WindowOperator.java:421)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:321)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:269)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> at java.lang.Thread.run(Thread.java:745)
>
> Do we have support for this functionality in *1.2-SNAPSHOT* ?
>
> Thanks.
>
> On Mon, Jan 23, 2017 at 10:57 AM, Jonas <jo...@huntun.de> wrote:
>
>> The documentation says
>>
>> https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
>> > On each event, a trigger can decide to fire (i.e., evaluate), purge
>> > (remove the window and discard its content), or fire and then purge the
>> > window. A trigger that just fires evaluates the window and keeps it as
>> it
>> > is, i.e., all elements remain in the window and are evaluated again when
>> > the triggers fires the next time.
>>
>> So you can choose between *Fire*, *Purge* and *Fire&Purge*. Seems like
>> selected *Fire* but meant to choose *Fire&Purge*.
>>
>> From what it seems you want a PurgingTrigger. You also didn't state what
>> version of Flink you are using :)
>>
>> -- Jonas
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Expected-behaviour-
>> of-windows-tp11200p11205.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> Thanks & Regards,
>
> *Abdul Salam Shaikh*
>
>


-- 
Thanks & Regards,

*Abdul Salam Shaikh*

Re: Expected behaviour of windows

Posted by Abdul Salam Shaikh <ab...@gmail.com>.
Thank you Jonas, I am using version *1.2-SNAPSHOT* of Apache Flink to
leverage the advanced Evictor class.

However, while trying to use FIRE_AND_PURGE I am getting the following
error:

java.lang.UnsupportedOperationException: Not supported yet.
at
de.traffic.ui.streaming.WindowCustomTrigger.clear(WindowCustomTrigger.java:51)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:643)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.cleanup(WindowOperator.java:421)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:321)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
at java.lang.Thread.run(Thread.java:745)

Do we have support for this functionality in *1.2-SNAPSHOT* ?

Thanks.

On Mon, Jan 23, 2017 at 10:57 AM, Jonas <jo...@huntun.de> wrote:

> The documentation says
>
> https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
> > On each event, a trigger can decide to fire (i.e., evaluate), purge
> > (remove the window and discard its content), or fire and then purge the
> > window. A trigger that just fires evaluates the window and keeps it as it
> > is, i.e., all elements remain in the window and are evaluated again when
> > the triggers fires the next time.
>
> So you can choose between *Fire*, *Purge* and *Fire&Purge*. Seems like
> selected *Fire* but meant to choose *Fire&Purge*.
>
> From what it seems you want a PurgingTrigger. You also didn't state what
> version of Flink you are using :)
>
> -- Jonas
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-of-windows-
> tp11200p11205.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 
Thanks & Regards,

*Abdul Salam Shaikh*

Re: Expected behaviour of windows

Posted by Jonas <jo...@huntun.de>.
The documentation says

https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
> On each event, a trigger can decide to fire (i.e., evaluate), purge
> (remove the window and discard its content), or fire and then purge the
> window. A trigger that just fires evaluates the window and keeps it as it
> is, i.e., all elements remain in the window and are evaluated again when
> the triggers fires the next time.

So you can choose between *Fire*, *Purge* and *Fire&Purge*. Seems like
selected *Fire* but meant to choose *Fire&Purge*.

From what it seems you want a PurgingTrigger. You also didn't state what
version of Flink you are using :)

-- Jonas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-of-windows-tp11200p11205.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.