You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Srikanth <sr...@gmail.com> on 2016/05/10 15:24:10 UTC
Force triggering events on watermark
Hi,
I read the following in Flink doc "We can explicitly specify a Trigger to
overwrite the default Trigger provided by the WindowAssigner. Note that
specifying a triggers does not add an additional trigger condition but
replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding
this will override the default watermark based trigger.
val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
4),
("2016-04-07 13:11:59", 157428, 4),
("2016-04-07 13:11:59", 111283, 23),
("2016-04-07 13:11:57", 108042, 23),
("2016-04-07 13:12:00", 161374, 9),
("2016-04-07 13:12:00", 161374, 9),
("2016-04-07 13:11:59", 136505, 4)
)
)
.assignAscendingTimestamps(b => f.parse(b._1).getTime())
.map(b => (b._3, b._2))
testStream.print
val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
countStream.print
Output I saw confirms the documented behavior. Processing is triggered only
when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the
window? I.e, I want to use triggers to start early processing but finalize
the window based on watermark.
Output shows that records for keys 23 & 9 weren't processed.
(4,157428)
(4,157428)
(23,111283)
(23,108042)
(9,161374)
(9,161374)
(4,136505)
(4,List(157428, 157428, 136505))
Thanks,
Srikanth
Re: Force triggering events on watermark
Posted by Aljoscha Krettek <al...@apache.org>.
Yes, this should work.
On Tue, 10 May 2016 at 19:01 Srikanth <sr...@gmail.com> wrote:
> Yes, will work.
> I was trying another route of having a "finalize & purge trigger" that will
> i) onElement - Register for event time watermark but not alter nested
> trigger's TriggerResult
> ii) OnEventTime - Always purge after fire
>
> That will work with CountTrigger and other custom trigger too rt?
>
> public class FinalizePurgingTrigger <T, W extends Window> extends
> Trigger<T, W> {
>
> @Override
> public TriggerResult onElement(T element, long timestamp, W window,
> TriggerContext ctx) throws Exception {
> ctx.registerEventTimeTimer(window.getEnd)
> return nestedTrigger.onElement(element, timestamp, window, ctx);
> }
>
> @Override
> public TriggerResult onEventTime(long time, W window, TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
> switch (triggerResult) {
> case FIRE:
> return TriggerResult.FIRE_AND_PURGE;
> case FIRE_AND_PURGE:
> return TriggerResult.FIRE_AND_PURGE;
> default:
> return TriggerResult.CONTINUE;
> }
> }
> }
>
> Srikanth
>
> On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Maybe the last example of this blog post is helpful [1].
>>
>> Best, Fabian
>>
>> [1]
>> https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
>>
>> 2016-05-10 17:24 GMT+02:00 Srikanth <sr...@gmail.com>:
>>
>>> Hi,
>>>
>>> I read the following in Flink doc "We can explicitly specify a Trigger
>>> to overwrite the default Trigger provided by the WindowAssigner. Note that
>>> specifying a triggers does not add an additional trigger condition but
>>> replaces the current trigger."
>>> So, I tested out the below code with count trigger. As per my
>>> understanding this will override the default watermark based trigger.
>>>
>>> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59",
>>> 157428, 4),
>>> ("2016-04-07 13:11:59", 157428, 4),
>>> ("2016-04-07 13:11:59", 111283, 23),
>>> ("2016-04-07 13:11:57", 108042, 23),
>>> ("2016-04-07 13:12:00", 161374, 9),
>>> ("2016-04-07 13:12:00", 161374, 9),
>>> ("2016-04-07 13:11:59", 136505, 4)
>>> )
>>> )
>>> .assignAscendingTimestamps(b => f.parse(b._1).getTime())
>>> .map(b => (b._3, b._2))
>>>
>>> testStream.print
>>>
>>> val countStream = testStream
>>> .keyBy(_._1)
>>> .timeWindow(Time.seconds(20))
>>> .trigger(CountTrigger.of(3))
>>> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>>>
>>> countStream.print
>>>
>>> Output I saw confirms the documented behavior. Processing is triggered
>>> only when we have 3 elements for a key.
>>> How do I force trigger the left over records when watermark is past the
>>> window? I.e, I want to use triggers to start early processing but finalize
>>> the window based on watermark.
>>>
>>> Output shows that records for keys 23 & 9 weren't processed.
>>> (4,157428)
>>> (4,157428)
>>> (23,111283)
>>> (23,108042)
>>> (9,161374)
>>> (9,161374)
>>> (4,136505)
>>>
>>> (4,List(157428, 157428, 136505))
>>>
>>> Thanks,
>>> Srikanth
>>>
>>
>>
>
Re: Force triggering events on watermark
Posted by Srikanth <sr...@gmail.com>.
Yes, will work.
I was trying another route of having a "finalize & purge trigger" that will
i) onElement - Register for event time watermark but not alter nested
trigger's TriggerResult
ii) OnEventTime - Always purge after fire
That will work with CountTrigger and other custom trigger too rt?
public class FinalizePurgingTrigger <T, W extends Window> extends
Trigger<T, W> {
@Override
public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ctx.registerEventTimeTimer(window.getEnd)
return nestedTrigger.onElement(element, timestamp, window, ctx);
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
}
}
Srikanth
On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
> Maybe the last example of this blog post is helpful [1].
>
> Best, Fabian
>
> [1]
> https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
>
> 2016-05-10 17:24 GMT+02:00 Srikanth <sr...@gmail.com>:
>
>> Hi,
>>
>> I read the following in Flink doc "We can explicitly specify a Trigger to
>> overwrite the default Trigger provided by the WindowAssigner. Note that
>> specifying a triggers does not add an additional trigger condition but
>> replaces the current trigger."
>> So, I tested out the below code with count trigger. As per my
>> understanding this will override the default watermark based trigger.
>>
>> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
>> 4),
>> ("2016-04-07 13:11:59", 157428, 4),
>> ("2016-04-07 13:11:59", 111283, 23),
>> ("2016-04-07 13:11:57", 108042, 23),
>> ("2016-04-07 13:12:00", 161374, 9),
>> ("2016-04-07 13:12:00", 161374, 9),
>> ("2016-04-07 13:11:59", 136505, 4)
>> )
>> )
>> .assignAscendingTimestamps(b => f.parse(b._1).getTime())
>> .map(b => (b._3, b._2))
>>
>> testStream.print
>>
>> val countStream = testStream
>> .keyBy(_._1)
>> .timeWindow(Time.seconds(20))
>> .trigger(CountTrigger.of(3))
>> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>>
>> countStream.print
>>
>> Output I saw confirms the documented behavior. Processing is triggered
>> only when we have 3 elements for a key.
>> How do I force trigger the left over records when watermark is past the
>> window? I.e, I want to use triggers to start early processing but finalize
>> the window based on watermark.
>>
>> Output shows that records for keys 23 & 9 weren't processed.
>> (4,157428)
>> (4,157428)
>> (23,111283)
>> (23,108042)
>> (9,161374)
>> (9,161374)
>> (4,136505)
>>
>> (4,List(157428, 157428, 136505))
>>
>> Thanks,
>> Srikanth
>>
>
>
Re: Force triggering events on watermark
Posted by Fabian Hueske <fh...@gmail.com>.
Maybe the last example of this blog post is helpful [1].
Best, Fabian
[1]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
2016-05-10 17:24 GMT+02:00 Srikanth <sr...@gmail.com>:
> Hi,
>
> I read the following in Flink doc "We can explicitly specify a Trigger to
> overwrite the default Trigger provided by the WindowAssigner. Note that
> specifying a triggers does not add an additional trigger condition but
> replaces the current trigger."
> So, I tested out the below code with count trigger. As per my
> understanding this will override the default watermark based trigger.
>
> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
> 4),
> ("2016-04-07 13:11:59", 157428, 4),
> ("2016-04-07 13:11:59", 111283, 23),
> ("2016-04-07 13:11:57", 108042, 23),
> ("2016-04-07 13:12:00", 161374, 9),
> ("2016-04-07 13:12:00", 161374, 9),
> ("2016-04-07 13:11:59", 136505, 4)
> )
> )
> .assignAscendingTimestamps(b => f.parse(b._1).getTime())
> .map(b => (b._3, b._2))
>
> testStream.print
>
> val countStream = testStream
> .keyBy(_._1)
> .timeWindow(Time.seconds(20))
> .trigger(CountTrigger.of(3))
> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>
> countStream.print
>
> Output I saw confirms the documented behavior. Processing is triggered
> only when we have 3 elements for a key.
> How do I force trigger the left over records when watermark is past the
> window? I.e, I want to use triggers to start early processing but finalize
> the window based on watermark.
>
> Output shows that records for keys 23 & 9 weren't processed.
> (4,157428)
> (4,157428)
> (23,111283)
> (23,108042)
> (9,161374)
> (9,161374)
> (4,136505)
>
> (4,List(157428, 157428, 136505))
>
> Thanks,
> Srikanth
>