You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Plamen Paskov <pl...@next-stream.com> on 2017/12/21 12:29:15 UTC

periodic trigger

Hi guys,
I have the following code:

SingleOutputStreamOperator<Event> lastUserSession = env
         .socketTextStream("localhost",9000,"\n")
         .map(new MapFunction<String, Event>() {
             @Override public Event map(String value)throws Exception {
                 String[] row = value.split(",");
                 return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
             }
         })
         .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
             @Override public long extractTimestamp(Event element) {
                 return element.timestamp;
             }
         })
         .keyBy("userId","sessionId")
         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
         .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
         .maxBy("length",false);

lastUserSession
         .timeWindowAll(Time.seconds(60))
         .aggregate(new AverageSessionLengthAcrossAllUsers())
         .print();

What i'm trying to achieve is to calculate the average session length every 10 seconds. The problem is that once the window length is 60 seconds and a computation is triggered
every 10 seconds i will receive duplicate events in my average calculation method so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before
AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
Any other suggestions how to workaround this?

Thanks


Re: periodic trigger

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Sorry for late response (because of the holiday period).

You didn’t mention lateness previously, that’s why I proposed such solution. 

Another approach would be to calculate max session length per user on the first aggregation level and at the same time remember what was the previously emitted/triggered value. Now, whenever you recalculate your window because of firing a trigger, you could check what is the new value and what was the previously emitted value. If they are the same, you do not have to emit anything. If they are different, you would have to issue an “update”/“diff” to the global aggregation on the second level. In case of simple average of session length, first level on update could emit “-old_value” and “+new_value” (negative old_value and positive new_value). 

To do this in Flink you could use 

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction

and store the previously emitted values in

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context#windowState()

For efficiency reasons best to combine it with a reduce function using this call

org.apache.flink.streaming.api.datastream.WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction<T>, org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<T,R,K,W>)

Reduce will ensure that your ProcessWindowFunction will not have to process all events belonging to the window each time it is triggered, but only it will have to process the single reduced element.

In your case:

        .keyBy("userId", "sessionId")
        .window(TumblingEventTimeWindows.of(Time.seconds(60)))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
        .reduce(MAX_BY_LENGTH_REDUCE_FUNCTION, MY_FANCY_PROCESS_WINDOW_FUNCTION_WITH_UPDATES).

Of course second level global aggregation would have to understand and take those “update”/“diffs” into account, but that would be simple to implement by some custom reduce function/aggregate. You would like the sequence of values 1, 1, 10, -10, 1 produce average of 1 and not 0.6 - negative values have to decrease counters in calculating the average value (to aggregate average value you always need to keep sum of values and a counter).

        .timeWindowAll(Time.seconds(60))
        .aggregate(FANCY_AGGREGATE_THAT_HANDLES_UPDATES)
        .print();

Hope that helps.

Piotrek

> On 22 Dec 2017, at 14:10, Plamen Paskov <pl...@next-stream.com> wrote:
> 
> I think it will not solve the problem as if i set ContinuousEventTimeTrigger to 10 seconds and allowedLateness(Time.seconds(60)) as i don't want to discard events from different users received later then i might receive more than one row for a single user based on the number of windows created by the events of this user. That will make the the average computations wrong.
> 
> On 22.12.2017 12:10, Piotr Nowojski wrote:
>> Ok, I think now I understand your problem. 
>> 
>> Wouldn’t it be enough, if you change last global window to something like this:
>> 
>> lastUserSession
>>         .timeWindowAll(Time.seconds(10))
>>         .aggregate(new AverageSessionLengthAcrossAllUsers())
>>         .print();
>> 
>> (As a side note, maybe you should use ContinousEventTimeTrigger in the first window). This way it will aggregate and calculate average session length of only last “preview results” of the 60 seconds user windows (emitted every 10 seconds from the first aggregation).
>> 
>> Piotrek
>> 
>>> On 21 Dec 2017, at 15:18, Plamen Paskov <plamen.paskov@next-stream.com <ma...@next-stream.com>> wrote:
>>> 
>>> Imagine a case where i want to run a computation every X seconds for 1 day window. I want the calculate average session length for current day every X seconds. Is there an easy way to achieve that?
>>> 
>>> On 21.12.2017 16:06, Piotr Nowojski wrote:
>>>> Hi,
>>>> 
>>>> You defined a tumbling window (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>) of 60 seconds, triggered every 10 seconds. This means that each input element can be processed/averaged up to 6 times (there is no other way if you trigger each window multiple times).
>>>> 
>>>> I am not sure what are you trying to achieve, but please refer to the documentation about different window types (tumbling, sliding, session) maybe it will clarify things for you:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners>
>>>> 
>>>> If you want to avoid duplicated processing, use either tumbling window with default trigger (triggering at the end of the window), or use session windows.
>>>> 
>>>> Piotrek
>>>> 
>>>> 
>>>>> On 21 Dec 2017, at 13:29, Plamen Paskov <plamen.paskov@next-stream.com <ma...@next-stream.com>> wrote:
>>>>> 
>>>>> Hi guys,
>>>>> I have the following code:
>>>>> 
>>>>> SingleOutputStreamOperator<Event> lastUserSession = env
>>>>>         .socketTextStream("localhost", 9000, "\n")
>>>>>         .map(new MapFunction<String, Event>() {
>>>>>             @Override
>>>>>             public Event map(String value) throws Exception {
>>>>>                 String[] row = value.split(",");
>>>>>                 return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>>>             }
>>>>>         })
>>>>>         .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>>>             @Override
>>>>>             public long extractTimestamp(Event element) {
>>>>>                 return element.timestamp;
>>>>>             }
>>>>>         })
>>>>>         .keyBy("userId", "sessionId")
>>>>>         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>>>>         .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>>>>         .maxBy("length", false);
>>>>> 
>>>>> lastUserSession
>>>>>         .timeWindowAll(Time.seconds(60))
>>>>>         .aggregate(new AverageSessionLengthAcrossAllUsers())
>>>>>         .print();
>>>>> 
>>>>> What i'm trying to achieve is to calculate the average session length every 10 seconds. The problem is that once the window length is 60 seconds and a computation is triggered
>>>>> every 10 seconds i will receive duplicate events in my average calculation method so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before 
>>>>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
>>>>> Any other suggestions how to workaround this?
>>>>> 
>>>>> Thanks
>>>> 
>>> 
>> 
> 


Re: periodic trigger

Posted by Plamen Paskov <pl...@next-stream.com>.
I think it will not solve the problem as if i set 
ContinuousEventTimeTrigger to 10 seconds and 
allowedLateness(Time.seconds(60)) as i don't want to discard events from 
different users received later then i might receive more than one row 
for a single user based on the number of windows created by the events 
of this user. That will make the the average computations wrong.


On 22.12.2017 12:10, Piotr Nowojski wrote:
> Ok, I think now I understand your problem.
>
> Wouldn’t it be enough, if you change last global window to something 
> like this:
>
> lastUserSession
>          .timeWindowAll(*Time.seconds(10)*)
>          .aggregate(new AverageSessionLengthAcrossAllUsers())
>          .print();
>
> (As a side note, maybe you should use ContinousEventTimeTrigger in the 
> first window). This way it will aggregate and calculate average 
> session length of only last “preview results” of the 60 seconds user 
> windows (emitted every 10 seconds from the first aggregation).
>
> Piotrek
>
>> On 21 Dec 2017, at 15:18, Plamen Paskov 
>> <plamen.paskov@next-stream.com 
>> <ma...@next-stream.com>> wrote:
>>
>> Imagine a case where i want to run a computation every X seconds for 
>> 1 day window. I want the calculate average session length for current 
>> day every X seconds. Is there an easy way to achieve that?
>>
>>
>> On 21.12.2017 16:06, Piotr Nowojski wrote:
>>> Hi,
>>>
>>> You defined a tumbling window 
>>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows) 
>>> of 60 seconds, triggered every 10 seconds. This means that each 
>>> input element can be processed/averaged up to 6 times (there is no 
>>> other way if you trigger each window multiple times).
>>>
>>> I am not sure what are you trying to achieve, but please refer to 
>>> the documentation about different window types (tumbling, sliding, 
>>> session) maybe it will clarify things for you:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
>>>
>>> If you want to avoid duplicated processing, use either tumbling 
>>> window with default trigger (triggering at the end of the window), 
>>> or use session windows.
>>>
>>> Piotrek
>>>
>>>
>>>> On 21 Dec 2017, at 13:29, Plamen Paskov 
>>>> <plamen.paskov@next-stream.com 
>>>> <ma...@next-stream.com>> wrote:
>>>>
>>>> Hi guys,
>>>> I have the following code:
>>>>
>>>> SingleOutputStreamOperator<Event> lastUserSession = env
>>>>          .socketTextStream("localhost",9000,"\n")
>>>>          .map(new MapFunction<String, Event>() {
>>>>              @Override public Event map(String value)throws Exception {
>>>>                  String[] row = value.split(",");
>>>>                  return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>>              }
>>>>          })
>>>>          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>>              @Override public long extractTimestamp(Event element) {
>>>>                  return element.timestamp;
>>>>              }
>>>>          })
>>>>          .keyBy("userId","sessionId")
>>>>          .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>>>          .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>>>          .maxBy("length",false);
>>>>
>>>> lastUserSession
>>>>          .timeWindowAll(Time.seconds(60))
>>>>          .aggregate(new AverageSessionLengthAcrossAllUsers())
>>>>          .print();
>>>>
>>>> What i'm trying to achieve is to calculate the average session length every 10 seconds. The problem is that once the window length is 60 seconds and a computation is triggered
>>>> every 10 seconds i will receive duplicate events in my average calculation method so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before
>>>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
>>>> Any other suggestions how to workaround this?
>>>>
>>>> Thanks
>>>
>>
>


Re: periodic trigger

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Ok, I think now I understand your problem. 

Wouldn’t it be enough, if you change last global window to something like this:

lastUserSession
        .timeWindowAll(Time.seconds(10))
        .aggregate(new AverageSessionLengthAcrossAllUsers())
        .print();

(As a side note, maybe you should use ContinousEventTimeTrigger in the first window). This way it will aggregate and calculate average session length of only last “preview results” of the 60 seconds user windows (emitted every 10 seconds from the first aggregation).

Piotrek

> On 21 Dec 2017, at 15:18, Plamen Paskov <pl...@next-stream.com> wrote:
> 
> Imagine a case where i want to run a computation every X seconds for 1 day window. I want the calculate average session length for current day every X seconds. Is there an easy way to achieve that?
> 
> On 21.12.2017 16:06, Piotr Nowojski wrote:
>> Hi,
>> 
>> You defined a tumbling window (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>) of 60 seconds, triggered every 10 seconds. This means that each input element can be processed/averaged up to 6 times (there is no other way if you trigger each window multiple times).
>> 
>> I am not sure what are you trying to achieve, but please refer to the documentation about different window types (tumbling, sliding, session) maybe it will clarify things for you:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners>
>> 
>> If you want to avoid duplicated processing, use either tumbling window with default trigger (triggering at the end of the window), or use session windows.
>> 
>> Piotrek
>> 
>> 
>>> On 21 Dec 2017, at 13:29, Plamen Paskov <plamen.paskov@next-stream.com <ma...@next-stream.com>> wrote:
>>> 
>>> Hi guys,
>>> I have the following code:
>>> 
>>> SingleOutputStreamOperator<Event> lastUserSession = env
>>>         .socketTextStream("localhost", 9000, "\n")
>>>         .map(new MapFunction<String, Event>() {
>>>             @Override
>>>             public Event map(String value) throws Exception {
>>>                 String[] row = value.split(",");
>>>                 return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>             }
>>>         })
>>>         .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>             @Override
>>>             public long extractTimestamp(Event element) {
>>>                 return element.timestamp;
>>>             }
>>>         })
>>>         .keyBy("userId", "sessionId")
>>>         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>>         .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>>         .maxBy("length", false);
>>> 
>>> lastUserSession
>>>         .timeWindowAll(Time.seconds(60))
>>>         .aggregate(new AverageSessionLengthAcrossAllUsers())
>>>         .print();
>>> 
>>> What i'm trying to achieve is to calculate the average session length every 10 seconds. The problem is that once the window length is 60 seconds and a computation is triggered
>>> every 10 seconds i will receive duplicate events in my average calculation method so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before 
>>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
>>> Any other suggestions how to workaround this?
>>> 
>>> Thanks
>> 
> 


Re: periodic trigger

Posted by Plamen Paskov <pl...@next-stream.com>.
Imagine a case where i want to run a computation every X seconds for 1 
day window. I want the calculate average session length for current day 
every X seconds. Is there an easy way to achieve that?


On 21.12.2017 16:06, Piotr Nowojski wrote:
> Hi,
>
> You defined a tumbling window 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows) 
> of 60 seconds, triggered every 10 seconds. This means that each input 
> element can be processed/averaged up to 6 times (there is no other way 
> if you trigger each window multiple times).
>
> I am not sure what are you trying to achieve, but please refer to the 
> documentation about different window types (tumbling, sliding, 
> session) maybe it will clarify things for you:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
>
> If you want to avoid duplicated processing, use either tumbling window 
> with default trigger (triggering at the end of the window), or use 
> session windows.
>
> Piotrek
>
>
>> On 21 Dec 2017, at 13:29, Plamen Paskov 
>> <plamen.paskov@next-stream.com 
>> <ma...@next-stream.com>> wrote:
>>
>> Hi guys,
>> I have the following code:
>>
>> SingleOutputStreamOperator<Event> lastUserSession = env
>>          .socketTextStream("localhost",9000,"\n")
>>          .map(new MapFunction<String, Event>() {
>>              @Override public Event map(String value)throws Exception {
>>                  String[] row = value.split(",");
>>                  return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>              }
>>          })
>>          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>              @Override public long extractTimestamp(Event element) {
>>                  return element.timestamp;
>>              }
>>          })
>>          .keyBy("userId","sessionId")
>>          .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>          .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>          .maxBy("length",false);
>>
>> lastUserSession
>>          .timeWindowAll(Time.seconds(60))
>>          .aggregate(new AverageSessionLengthAcrossAllUsers())
>>          .print();
>>
>> What i'm trying to achieve is to calculate the average session length every 10 seconds. The problem is that once the window length is 60 seconds and a computation is triggered
>> every 10 seconds i will receive duplicate events in my average calculation method so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before
>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
>> Any other suggestions how to workaround this?
>>
>> Thanks
>


Re: periodic trigger

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

You defined a tumbling window (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>) of 60 seconds, triggered every 10 seconds. This means that each input element can be processed/averaged up to 6 times (there is no other way if you trigger each window multiple times).

I am not sure what are you trying to achieve, but please refer to the documentation about different window types (tumbling, sliding, session) maybe it will clarify things for you:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners>

If you want to avoid duplicated processing, use either tumbling window with default trigger (triggering at the end of the window), or use session windows.

Piotrek


> On 21 Dec 2017, at 13:29, Plamen Paskov <pl...@next-stream.com> wrote:
> 
> Hi guys,
> I have the following code:
> 
> SingleOutputStreamOperator<Event> lastUserSession = env
>         .socketTextStream("localhost", 9000, "\n")
>         .map(new MapFunction<String, Event>() {
>             @Override
>             public Event map(String value) throws Exception {
>                 String[] row = value.split(",");
>                 return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>             }
>         })
>         .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>             @Override
>             public long extractTimestamp(Event element) {
>                 return element.timestamp;
>             }
>         })
>         .keyBy("userId", "sessionId")
>         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>         .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>         .maxBy("length", false);
> 
> lastUserSession
>         .timeWindowAll(Time.seconds(60))
>         .aggregate(new AverageSessionLengthAcrossAllUsers())
>         .print();
> 
> What i'm trying to achieve is to calculate the average session length every 10 seconds. The problem is that once the window length is 60 seconds and a computation is triggered
> every 10 seconds i will receive duplicate events in my average calculation method so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before 
> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
> Any other suggestions how to workaround this?
> 
> Thanks