You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tomasz Dobrzycki <do...@gmail.com> on 2017/08/17 08:25:20 UTC

Unexpected behaviour of a periodic trigger.

Hi,

I'm working on a custom trigger that is supposed to trigger
periodically and at the end of session window. These are the main
methods from my trigger:

public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
    long currentTime = System.currentTimeMillis();
    if (currentTime - lastTriggerTime >= this.delay) {
        lastTriggerTime = currentTime;
        return TriggerResult.FIRE;
    } else {
        return TriggerResult.CONTINUE;
    }
}

public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
    return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
}

When I use this trigger in my main processing method, I'm getting
unexpected behaviour. This is how I use it:

// MAIN PROCESSING
WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
              .map(new ParseEvent())
              .filter(new Filter())
              .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
                  @Override
                  public long extractTimestamp(EventTags event) {
                      return event.receivedAt;
                  }
              })
              .keyBy("streamKeys")
              .window(EventTimeSessionWindows.withGap(Time.minutes(5)));

// WARNING! This has to go before periodic triggered metrics as Flink
will trigger this as well
      // if it comes second
      DataStream<String> rawEvents = sessionWindow
              .reduce(new CollectRawData())
              .map(new ParseRawData());

DataStream<String> metrics = sessionWindow
              .trigger(SessionTrigger.every(Time.milliseconds(2)))
              .apply(new ExtractMetrics());


This works as expected, rawEvents is calculated when the session
window is completed and metrics is calculated periodically and at the
windows end. But if I change the order of rawEvents and metrics (code
should work the same in my mind), rawEvents is also triggered
periodically. Is this expected to work this way? I'm not assigning
periodic trigger to rawEvents. Thanks for your help.

Kind Regards,
Tomasz

Re: Unexpected behaviour of a periodic trigger.

Posted by Tomasz Dobrzycki <do...@gmail.com>.
Hi Tony,

Thank you for this thorough explanation. Helps a lot!

Kind Regards,
Tomasz

On 23 August 2017 at 11:30, Tony Wei <to...@gmail.com> wrote:
> Hi Tomasz,
>
> Actually, window is not a real operator shared by your operators created by
> reduce() and apply() function.
> Flink implemented WindowOperator by binding window(), trigger() and
> evictor() as well with the WindowFunction.
> It is more like the prior operator sent elements to two following operators
> and they created their own window state by themselves.
> For more details, you can refer to this blog
> (https://flink.apache.org/news/2015/12/04/Introducing-windows.html)
>
> Therefore, the modified version of mine is not different from yours.
>
> Regards,
> Tony Wei
>
> 2017-08-23 18:11 GMT+08:00 Tomasz Dobrzycki <do...@gmail.com>:
>>
>> Hi Tony,
>>
>> Won't that increase the amount of processing Flink has to do? It would
>> have to window twice, right?
>>
>> Thanks,
>> Tomasz
>>
>> On 23 August 2017 at 11:02, Tony Wei <to...@gmail.com> wrote:
>> > Hi Tomasz,
>> >
>> > In my opinion, I would move .window() function down to these two
>> > DataStream.
>> > (rawEvent.window().reduce().map(), and so does metrics)
>> > It makes sure that they won't share the same constructor.
>> >
>> > Regards,
>> > Tony Wei
>> >
>> > 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki
>> > <do...@gmail.com>:
>> >>
>> >> Hi Tony,
>> >>
>> >> Thank you for your answer, it definitely helps with understanding this
>> >> situation.
>> >> Is there any reliable way to split the stream so I get 2 outputs that
>> >> avoids this behaviour? Eventually I want to have 2 sinks that output
>> >> different data (one being just a copy of the stream, but organised in
>> >> session windows and the other being metrics which I derive from the
>> >> data itself).
>> >>
>> >> Thanks,
>> >> Tomasz
>> >>
>> >> On 23 August 2017 at 10:32, 魏偉哲 <to...@gmail.com> wrote:
>> >> > Hi Tomasz,
>> >> >
>> >> > I think this is because .window() is a lazy operator.
>> >> > It just creates a WindowedStream class but not create a corresponding
>> >> > operator.
>> >> > The operator will be created after you called .reduce() and .apply().
>> >> >
>> >> > rawEvents and metrics actually shared the same object to create their
>> >> > own
>> >> > operators.
>> >> > You can see the detail in WindowedStream.trigger() that it only set
>> >> > this.trigger = trigger and then return iteself.
>> >> > Because of this, when you used the same object to create operator for
>> >> > rawEvents, it took the same settings for both WindowAssigner and
>> >> > Trigger
>> >> > as
>> >> > well.
>> >> > That's why you changed the order then the behavior changed as well.
>> >> >
>> >> > Hope this will help you.
>> >> >
>> >> > Regards,
>> >> > Tony Wei
>> >> >
>> >> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki
>> >> > <do...@gmail.com>:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I'm working on a custom trigger that is supposed to trigger
>> >> >> periodically and at the end of session window. These are the main
>> >> >> methods from my trigger:
>> >> >>
>> >> >> public TriggerResult onElement(Object element, long timestamp,
>> >> >> TimeWindow window, TriggerContext ctx) throws Exception {
>> >> >>     long currentTime = System.currentTimeMillis();
>> >> >>     if (currentTime - lastTriggerTime >= this.delay) {
>> >> >>         lastTriggerTime = currentTime;
>> >> >>         return TriggerResult.FIRE;
>> >> >>     } else {
>> >> >>         return TriggerResult.CONTINUE;
>> >> >>     }
>> >> >> }
>> >> >>
>> >> >> public TriggerResult onEventTime(long time, TimeWindow window,
>> >> >> TriggerContext ctx) {
>> >> >>     return time == window.maxTimestamp() ?
>> >> >>             TriggerResult.FIRE :
>> >> >>             TriggerResult.CONTINUE;
>> >> >> }
>> >> >>
>> >> >> When I use this trigger in my main processing method, I'm getting
>> >> >> unexpected behaviour. This is how I use it:
>> >> >>
>> >> >> // MAIN PROCESSING
>> >> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow =
>> >> >> dataStream
>> >> >>               .map(new ParseEvent())
>> >> >>               .filter(new Filter())
>> >> >>               .assignTimestampsAndWatermarks(new
>> >> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5))
>> >> >> {
>> >> >>                   @Override
>> >> >>                   public long extractTimestamp(EventTags event) {
>> >> >>                       return event.receivedAt;
>> >> >>                   }
>> >> >>               })
>> >> >>               .keyBy("streamKeys")
>> >> >>
>> >> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>> >> >>
>> >> >> // WARNING! This has to go before periodic triggered metrics as
>> >> >> Flink
>> >> >> will trigger this as well
>> >> >>       // if it comes second
>> >> >>       DataStream<String> rawEvents = sessionWindow
>> >> >>               .reduce(new CollectRawData())
>> >> >>               .map(new ParseRawData());
>> >> >>
>> >> >> DataStream<String> metrics = sessionWindow
>> >> >>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>> >> >>               .apply(new ExtractMetrics());
>> >> >>
>> >> >>
>> >> >> This works as expected, rawEvents is calculated when the session
>> >> >> window is completed and metrics is calculated periodically and at
>> >> >> the
>> >> >> windows end. But if I change the order of rawEvents and metrics
>> >> >> (code
>> >> >> should work the same in my mind), rawEvents is also triggered
>> >> >> periodically. Is this expected to work this way? I'm not assigning
>> >> >> periodic trigger to rawEvents. Thanks for your help.
>> >> >>
>> >> >> Kind Regards,
>> >> >> Tomasz
>> >> >
>> >> >
>> >
>> >
>
>

Re: Unexpected behaviour of a periodic trigger.

Posted by Tony Wei <to...@gmail.com>.
Hi Tomasz,

Actually, window is not a real operator shared by your operators created by
reduce() and apply() function.
Flink implemented WindowOperator by binding window(), trigger() and
evictor() as well with the WindowFunction.
It is more like the prior operator sent elements to two following operators
and they created their own window state by themselves.
For more details, you can refer to this blog (https://flink.apache.org/
news/2015/12/04/Introducing-windows.html)

Therefore, the modified version of mine is not different from yours.

Regards,
Tony Wei

2017-08-23 18:11 GMT+08:00 Tomasz Dobrzycki <do...@gmail.com>:

> Hi Tony,
>
> Won't that increase the amount of processing Flink has to do? It would
> have to window twice, right?
>
> Thanks,
> Tomasz
>
> On 23 August 2017 at 11:02, Tony Wei <to...@gmail.com> wrote:
> > Hi Tomasz,
> >
> > In my opinion, I would move .window() function down to these two
> DataStream.
> > (rawEvent.window().reduce().map(), and so does metrics)
> > It makes sure that they won't share the same constructor.
> >
> > Regards,
> > Tony Wei
> >
> > 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tomasz@gmail.com
> >:
> >>
> >> Hi Tony,
> >>
> >> Thank you for your answer, it definitely helps with understanding this
> >> situation.
> >> Is there any reliable way to split the stream so I get 2 outputs that
> >> avoids this behaviour? Eventually I want to have 2 sinks that output
> >> different data (one being just a copy of the stream, but organised in
> >> session windows and the other being metrics which I derive from the
> >> data itself).
> >>
> >> Thanks,
> >> Tomasz
> >>
> >> On 23 August 2017 at 10:32, 魏偉哲 <to...@gmail.com> wrote:
> >> > Hi Tomasz,
> >> >
> >> > I think this is because .window() is a lazy operator.
> >> > It just creates a WindowedStream class but not create a corresponding
> >> > operator.
> >> > The operator will be created after you called .reduce() and .apply().
> >> >
> >> > rawEvents and metrics actually shared the same object to create their
> >> > own
> >> > operators.
> >> > You can see the detail in WindowedStream.trigger() that it only set
> >> > this.trigger = trigger and then return iteself.
> >> > Because of this, when you used the same object to create operator for
> >> > rawEvents, it took the same settings for both WindowAssigner and
> Trigger
> >> > as
> >> > well.
> >> > That's why you changed the order then the behavior changed as well.
> >> >
> >> > Hope this will help you.
> >> >
> >> > Regards,
> >> > Tony Wei
> >> >
> >> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki
> >> > <do...@gmail.com>:
> >> >>
> >> >> Hi,
> >> >>
> >> >> I'm working on a custom trigger that is supposed to trigger
> >> >> periodically and at the end of session window. These are the main
> >> >> methods from my trigger:
> >> >>
> >> >> public TriggerResult onElement(Object element, long timestamp,
> >> >> TimeWindow window, TriggerContext ctx) throws Exception {
> >> >>     long currentTime = System.currentTimeMillis();
> >> >>     if (currentTime - lastTriggerTime >= this.delay) {
> >> >>         lastTriggerTime = currentTime;
> >> >>         return TriggerResult.FIRE;
> >> >>     } else {
> >> >>         return TriggerResult.CONTINUE;
> >> >>     }
> >> >> }
> >> >>
> >> >> public TriggerResult onEventTime(long time, TimeWindow window,
> >> >> TriggerContext ctx) {
> >> >>     return time == window.maxTimestamp() ?
> >> >>             TriggerResult.FIRE :
> >> >>             TriggerResult.CONTINUE;
> >> >> }
> >> >>
> >> >> When I use this trigger in my main processing method, I'm getting
> >> >> unexpected behaviour. This is how I use it:
> >> >>
> >> >> // MAIN PROCESSING
> >> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow =
> dataStream
> >> >>               .map(new ParseEvent())
> >> >>               .filter(new Filter())
> >> >>               .assignTimestampsAndWatermarks(new
> >> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5))
> {
> >> >>                   @Override
> >> >>                   public long extractTimestamp(EventTags event) {
> >> >>                       return event.receivedAt;
> >> >>                   }
> >> >>               })
> >> >>               .keyBy("streamKeys")
> >> >>
> >> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
> >> >>
> >> >> // WARNING! This has to go before periodic triggered metrics as Flink
> >> >> will trigger this as well
> >> >>       // if it comes second
> >> >>       DataStream<String> rawEvents = sessionWindow
> >> >>               .reduce(new CollectRawData())
> >> >>               .map(new ParseRawData());
> >> >>
> >> >> DataStream<String> metrics = sessionWindow
> >> >>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
> >> >>               .apply(new ExtractMetrics());
> >> >>
> >> >>
> >> >> This works as expected, rawEvents is calculated when the session
> >> >> window is completed and metrics is calculated periodically and at the
> >> >> windows end. But if I change the order of rawEvents and metrics (code
> >> >> should work the same in my mind), rawEvents is also triggered
> >> >> periodically. Is this expected to work this way? I'm not assigning
> >> >> periodic trigger to rawEvents. Thanks for your help.
> >> >>
> >> >> Kind Regards,
> >> >> Tomasz
> >> >
> >> >
> >
> >
>

Re: Unexpected behaviour of a periodic trigger.

Posted by Tomasz Dobrzycki <do...@gmail.com>.
Hi Tony,

Won't that increase the amount of processing Flink has to do? It would
have to window twice, right?

Thanks,
Tomasz

On 23 August 2017 at 11:02, Tony Wei <to...@gmail.com> wrote:
> Hi Tomasz,
>
> In my opinion, I would move .window() function down to these two DataStream.
> (rawEvent.window().reduce().map(), and so does metrics)
> It makes sure that they won't share the same constructor.
>
> Regards,
> Tony Wei
>
> 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki <do...@gmail.com>:
>>
>> Hi Tony,
>>
>> Thank you for your answer, it definitely helps with understanding this
>> situation.
>> Is there any reliable way to split the stream so I get 2 outputs that
>> avoids this behaviour? Eventually I want to have 2 sinks that output
>> different data (one being just a copy of the stream, but organised in
>> session windows and the other being metrics which I derive from the
>> data itself).
>>
>> Thanks,
>> Tomasz
>>
>> On 23 August 2017 at 10:32, 魏偉哲 <to...@gmail.com> wrote:
>> > Hi Tomasz,
>> >
>> > I think this is because .window() is a lazy operator.
>> > It just creates a WindowedStream class but not create a corresponding
>> > operator.
>> > The operator will be created after you called .reduce() and .apply().
>> >
>> > rawEvents and metrics actually shared the same object to create their
>> > own
>> > operators.
>> > You can see the detail in WindowedStream.trigger() that it only set
>> > this.trigger = trigger and then return iteself.
>> > Because of this, when you used the same object to create operator for
>> > rawEvents, it took the same settings for both WindowAssigner and Trigger
>> > as
>> > well.
>> > That's why you changed the order then the behavior changed as well.
>> >
>> > Hope this will help you.
>> >
>> > Regards,
>> > Tony Wei
>> >
>> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki
>> > <do...@gmail.com>:
>> >>
>> >> Hi,
>> >>
>> >> I'm working on a custom trigger that is supposed to trigger
>> >> periodically and at the end of session window. These are the main
>> >> methods from my trigger:
>> >>
>> >> public TriggerResult onElement(Object element, long timestamp,
>> >> TimeWindow window, TriggerContext ctx) throws Exception {
>> >>     long currentTime = System.currentTimeMillis();
>> >>     if (currentTime - lastTriggerTime >= this.delay) {
>> >>         lastTriggerTime = currentTime;
>> >>         return TriggerResult.FIRE;
>> >>     } else {
>> >>         return TriggerResult.CONTINUE;
>> >>     }
>> >> }
>> >>
>> >> public TriggerResult onEventTime(long time, TimeWindow window,
>> >> TriggerContext ctx) {
>> >>     return time == window.maxTimestamp() ?
>> >>             TriggerResult.FIRE :
>> >>             TriggerResult.CONTINUE;
>> >> }
>> >>
>> >> When I use this trigger in my main processing method, I'm getting
>> >> unexpected behaviour. This is how I use it:
>> >>
>> >> // MAIN PROCESSING
>> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
>> >>               .map(new ParseEvent())
>> >>               .filter(new Filter())
>> >>               .assignTimestampsAndWatermarks(new
>> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
>> >>                   @Override
>> >>                   public long extractTimestamp(EventTags event) {
>> >>                       return event.receivedAt;
>> >>                   }
>> >>               })
>> >>               .keyBy("streamKeys")
>> >>
>> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>> >>
>> >> // WARNING! This has to go before periodic triggered metrics as Flink
>> >> will trigger this as well
>> >>       // if it comes second
>> >>       DataStream<String> rawEvents = sessionWindow
>> >>               .reduce(new CollectRawData())
>> >>               .map(new ParseRawData());
>> >>
>> >> DataStream<String> metrics = sessionWindow
>> >>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>> >>               .apply(new ExtractMetrics());
>> >>
>> >>
>> >> This works as expected, rawEvents is calculated when the session
>> >> window is completed and metrics is calculated periodically and at the
>> >> windows end. But if I change the order of rawEvents and metrics (code
>> >> should work the same in my mind), rawEvents is also triggered
>> >> periodically. Is this expected to work this way? I'm not assigning
>> >> periodic trigger to rawEvents. Thanks for your help.
>> >>
>> >> Kind Regards,
>> >> Tomasz
>> >
>> >
>
>

Re: Unexpected behaviour of a periodic trigger.

Posted by Tony Wei <to...@gmail.com>.
Hi Tomasz,

In my opinion, I would move .window() function down to these two
DataStream. (rawEvent.window().reduce().map(), and so does metrics)
It makes sure that they won't share the same constructor.

Regards,
Tony Wei

2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki <do...@gmail.com>:

> Hi Tony,
>
> Thank you for your answer, it definitely helps with understanding this
> situation.
> Is there any reliable way to split the stream so I get 2 outputs that
> avoids this behaviour? Eventually I want to have 2 sinks that output
> different data (one being just a copy of the stream, but organised in
> session windows and the other being metrics which I derive from the
> data itself).
>
> Thanks,
> Tomasz
>
> On 23 August 2017 at 10:32, 魏偉哲 <to...@gmail.com> wrote:
> > Hi Tomasz,
> >
> > I think this is because .window() is a lazy operator.
> > It just creates a WindowedStream class but not create a corresponding
> > operator.
> > The operator will be created after you called .reduce() and .apply().
> >
> > rawEvents and metrics actually shared the same object to create their own
> > operators.
> > You can see the detail in WindowedStream.trigger() that it only set
> > this.trigger = trigger and then return iteself.
> > Because of this, when you used the same object to create operator for
> > rawEvents, it took the same settings for both WindowAssigner and Trigger
> as
> > well.
> > That's why you changed the order then the behavior changed as well.
> >
> > Hope this will help you.
> >
> > Regards,
> > Tony Wei
> >
> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tomasz@gmail.com
> >:
> >>
> >> Hi,
> >>
> >> I'm working on a custom trigger that is supposed to trigger
> >> periodically and at the end of session window. These are the main
> >> methods from my trigger:
> >>
> >> public TriggerResult onElement(Object element, long timestamp,
> >> TimeWindow window, TriggerContext ctx) throws Exception {
> >>     long currentTime = System.currentTimeMillis();
> >>     if (currentTime - lastTriggerTime >= this.delay) {
> >>         lastTriggerTime = currentTime;
> >>         return TriggerResult.FIRE;
> >>     } else {
> >>         return TriggerResult.CONTINUE;
> >>     }
> >> }
> >>
> >> public TriggerResult onEventTime(long time, TimeWindow window,
> >> TriggerContext ctx) {
> >>     return time == window.maxTimestamp() ?
> >>             TriggerResult.FIRE :
> >>             TriggerResult.CONTINUE;
> >> }
> >>
> >> When I use this trigger in my main processing method, I'm getting
> >> unexpected behaviour. This is how I use it:
> >>
> >> // MAIN PROCESSING
> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
> >>               .map(new ParseEvent())
> >>               .filter(new Filter())
> >>               .assignTimestampsAndWatermarks(new
> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
> >>                   @Override
> >>                   public long extractTimestamp(EventTags event) {
> >>                       return event.receivedAt;
> >>                   }
> >>               })
> >>               .keyBy("streamKeys")
> >>               .window(EventTimeSessionWindows.
> withGap(Time.minutes(5)));
> >>
> >> // WARNING! This has to go before periodic triggered metrics as Flink
> >> will trigger this as well
> >>       // if it comes second
> >>       DataStream<String> rawEvents = sessionWindow
> >>               .reduce(new CollectRawData())
> >>               .map(new ParseRawData());
> >>
> >> DataStream<String> metrics = sessionWindow
> >>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
> >>               .apply(new ExtractMetrics());
> >>
> >>
> >> This works as expected, rawEvents is calculated when the session
> >> window is completed and metrics is calculated periodically and at the
> >> windows end. But if I change the order of rawEvents and metrics (code
> >> should work the same in my mind), rawEvents is also triggered
> >> periodically. Is this expected to work this way? I'm not assigning
> >> periodic trigger to rawEvents. Thanks for your help.
> >>
> >> Kind Regards,
> >> Tomasz
> >
> >
>

Re: Unexpected behaviour of a periodic trigger.

Posted by Tomasz Dobrzycki <do...@gmail.com>.
Hi Tony,

Thank you for your answer, it definitely helps with understanding this
situation.
Is there any reliable way to split the stream so I get 2 outputs that
avoids this behaviour? Eventually I want to have 2 sinks that output
different data (one being just a copy of the stream, but organised in
session windows and the other being metrics which I derive from the
data itself).

Thanks,
Tomasz

On 23 August 2017 at 10:32, 魏偉哲 <to...@gmail.com> wrote:
> Hi Tomasz,
>
> I think this is because .window() is a lazy operator.
> It just creates a WindowedStream class but not create a corresponding
> operator.
> The operator will be created after you called .reduce() and .apply().
>
> rawEvents and metrics actually shared the same object to create their own
> operators.
> You can see the detail in WindowedStream.trigger() that it only set
> this.trigger = trigger and then return iteself.
> Because of this, when you used the same object to create operator for
> rawEvents, it took the same settings for both WindowAssigner and Trigger as
> well.
> That's why you changed the order then the behavior changed as well.
>
> Hope this will help you.
>
> Regards,
> Tony Wei
>
> 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <do...@gmail.com>:
>>
>> Hi,
>>
>> I'm working on a custom trigger that is supposed to trigger
>> periodically and at the end of session window. These are the main
>> methods from my trigger:
>>
>> public TriggerResult onElement(Object element, long timestamp,
>> TimeWindow window, TriggerContext ctx) throws Exception {
>>     long currentTime = System.currentTimeMillis();
>>     if (currentTime - lastTriggerTime >= this.delay) {
>>         lastTriggerTime = currentTime;
>>         return TriggerResult.FIRE;
>>     } else {
>>         return TriggerResult.CONTINUE;
>>     }
>> }
>>
>> public TriggerResult onEventTime(long time, TimeWindow window,
>> TriggerContext ctx) {
>>     return time == window.maxTimestamp() ?
>>             TriggerResult.FIRE :
>>             TriggerResult.CONTINUE;
>> }
>>
>> When I use this trigger in my main processing method, I'm getting
>> unexpected behaviour. This is how I use it:
>>
>> // MAIN PROCESSING
>> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
>>               .map(new ParseEvent())
>>               .filter(new Filter())
>>               .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
>>                   @Override
>>                   public long extractTimestamp(EventTags event) {
>>                       return event.receivedAt;
>>                   }
>>               })
>>               .keyBy("streamKeys")
>>               .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>>
>> // WARNING! This has to go before periodic triggered metrics as Flink
>> will trigger this as well
>>       // if it comes second
>>       DataStream<String> rawEvents = sessionWindow
>>               .reduce(new CollectRawData())
>>               .map(new ParseRawData());
>>
>> DataStream<String> metrics = sessionWindow
>>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>>               .apply(new ExtractMetrics());
>>
>>
>> This works as expected, rawEvents is calculated when the session
>> window is completed and metrics is calculated periodically and at the
>> windows end. But if I change the order of rawEvents and metrics (code
>> should work the same in my mind), rawEvents is also triggered
>> periodically. Is this expected to work this way? I'm not assigning
>> periodic trigger to rawEvents. Thanks for your help.
>>
>> Kind Regards,
>> Tomasz
>
>

Re: Unexpected behaviour of a periodic trigger.

Posted by 魏偉哲 <to...@gmail.com>.
Hi Tomasz,

I think this is because .window() is a lazy operator.
It just creates a WindowedStream class but not create a corresponding
operator.
The operator will be created after you called .reduce() and .apply().

rawEvents and metrics actually shared the same object to create their own
operators.
You can see the detail in WindowedStream.trigger() that it only set
this.trigger = trigger and then return iteself.
Because of this, when you used the same object to create operator for
rawEvents, it took the same settings for both WindowAssigner and Trigger as
well.
That's why you changed the order then the behavior changed as well.

Hope this will help you.

Regards,
Tony Wei

2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki <do...@gmail.com>:

> Hi,
>
> I'm working on a custom trigger that is supposed to trigger
> periodically and at the end of session window. These are the main
> methods from my trigger:
>
> public TriggerResult onElement(Object element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
>     long currentTime = System.currentTimeMillis();
>     if (currentTime - lastTriggerTime >= this.delay) {
>         lastTriggerTime = currentTime;
>         return TriggerResult.FIRE;
>     } else {
>         return TriggerResult.CONTINUE;
>     }
> }
>
> public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
>     return time == window.maxTimestamp() ?
>             TriggerResult.FIRE :
>             TriggerResult.CONTINUE;
> }
>
> When I use this trigger in my main processing method, I'm getting
> unexpected behaviour. This is how I use it:
>
> // MAIN PROCESSING
> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream
>               .map(new ParseEvent())
>               .filter(new Filter())
>               .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) {
>                   @Override
>                   public long extractTimestamp(EventTags event) {
>                       return event.receivedAt;
>                   }
>               })
>               .keyBy("streamKeys")
>               .window(EventTimeSessionWindows.withGap(Time.minutes(5)));
>
> // WARNING! This has to go before periodic triggered metrics as Flink
> will trigger this as well
>       // if it comes second
>       DataStream<String> rawEvents = sessionWindow
>               .reduce(new CollectRawData())
>               .map(new ParseRawData());
>
> DataStream<String> metrics = sessionWindow
>               .trigger(SessionTrigger.every(Time.milliseconds(2)))
>               .apply(new ExtractMetrics());
>
>
> This works as expected, rawEvents is calculated when the session
> window is completed and metrics is calculated periodically and at the
> windows end. But if I change the order of rawEvents and metrics (code
> should work the same in my mind), rawEvents is also triggered
> periodically. Is this expected to work this way? I'm not assigning
> periodic trigger to rawEvents. Thanks for your help.
>
> Kind Regards,
> Tomasz
>