You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2018/07/23 13:40:11 UTC

Modify EventTimeTrigger only for event-time session windows

Hi all,

I want to be sure about when EventTimeTrigger.onEventTime() method is
called with event-time session windows.
It returns TriggerResult.FIRE only when the timestamp of the registered
timer equals to the max timestamp of the current window:

@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

As far as I understand, when EventTimeTrigger is used with event-time
session window, there's no chance of EventTimeTrigger.onEventTime being
called with time != window.maxTimestamp.
Is it true? If not, could anyone let me know the corner case?

The reason why I'm asking is because I want to register an event-time timer
when my custom trigger receives a special event which signifies the end of
a session.
The timestamp of the registered timer is not going to be equal to
window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in
such a case.
As I also want to purge the content of a window when it expires,
onEventTime should look like this:

@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
It looks quite dangerous and I'm not quite sure about that.

- Dongwon

Re: Modify EventTimeTrigger only for event-time session windows

Posted by 김동원 <ea...@gmail.com>.
Sorry for the late response due to time difference between Berlin and Seoul :-)

What you are asking is why I'm trying to modify the condition of the original ternary operator used in EventTimeTrigger.onEventTime?

If that is your question, I've already answered it on the first in the thread as follows:
>>> The reason why I'm asking is because I want to register an event-time timer when my custom trigger receives a special event which signifies the end of a session.
>>> The timestamp of the registered timer is not going to be equal to window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in such a case.
>>> As I also want to purge the content of a window when it expires, onEventTime should look like this:
>>> 
>>> 	@Override
>>> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>>> 		return TriggerResult.FIRE_AND_PURGE;
>>> 	}
>>> 
>>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>>> It looks quite dangerous and I'm not quite sure about that.


If it is not enough, let me explain in detail.
- The period of user events is normally 5 minutes but can be irregular by manual operations.
- User sessions can be finished with a special flag which signifies the end of a session.
- Some users do not complete normally; so we want to expire such sessions after giving a timeout of 1 hour. We cannot expect low latency in such cases.
- For those who sends a special flag, we want to give them the result of window aggregation as soon as we receive the special flag.

My streaming pipeline is quite simple as shown below.

// default parallelism is 20
env
  .addSource(consumer)
  .process(jsonParser)
  .assignTimesstampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtrator(...){...})
  .keyBy(_.key)
  .window(EventTimeSessionWindows.withGap(sessionGap))
  .trigger(new EarlyResultEventTimeTrigger[MyEvent](_.isLast))
  .aggregate(aggregator)
  .setParallelism(128)
  .addSink(producer)
  .setParallelism(128)

Initially, when an event with a special flag is entered, I want to *immediately* emit the result of an aggregate function of a session window.
At that time, I've never thought that events from a user arrive to a session window out-of-order.

So my first custom trigger looks like below:

class EarlyResultEventTimeTrigger[T](eval: (T => Boolean)) extends Trigger[T, TimeWindow] {
  override def onElement(element: T, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    if (window.maxTimestamp <= ctx.getCurrentWatermark) {
      TriggerResult.FIRE
    } else {
      if (eval(element)) {
        TriggerResult.FIRE_AND_PURGE
      } else {
        ctx.registerEventTimeTimer(window.maxTimestamp)
        TriggerResult.CONTINUE
      }
    }
  }

Compared to the EventTimeTrigger.oneElement, three lines of code are added:

if (eval(element)) {
    TriggerResult.FIRE_AND_PURGE
} 

This custom trigger, however, ignores out-of-orderness of events from the same user.
For example, we have the following 4 events from a user and they are added to the different partitions of a Kafka topic:
- A@00:00:00 / partition 1
- B@00:05:00 / partition 2 / 5 minutes after A
- C@00:10:00 / partition 3 / 5 minutes after B
- D@00:10:01 with a special flag / partition 4 / 1 second after C
Let's assume that after going through different source-process-assigner task chains and shuffling, the order of C and D is inverted; D arrives at the session window earlier than C.
The above EarlyResultEventTimeTrigger.onElement is going to return FIRE_AND_PURGE as soon as it receives D.
After A, B, and D are fired and purged, C arrives.
In this case, I end up with two different sessions:
- A, B, D
- C
I could return TriggerResult.FIRE on EarlyResultEventTimeTrigger.onElement, but
- it violates exactly-once semantics.
- it can significantly increase memory usage as our session gap is 1 hour.

To figure it out, I want to allow some degree of out-of-orderness before execution of FIRE_AND_PURGE.
It can be done by registering a timer instead of returning FIRE_AND_PURGE.
Below is the new custom trigger.

class DelayedEarlyResultEventTimeTrigger[T](eval: (T => Boolean), delay: Long = 0) extends Trigger[T, TimeWindow] {
  override def onElement(element: T, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    if (window.maxTimestamp <= ctx.getCurrentWatermark) {
      TriggerResult.FIRE
    } else {
      if (eval(element)) {
        ctx.registerEventTimeTimer(timestamp + delay)
      } else {
        ctx.registerEventTimeTimer(window.maxTimestamp)
      }
      TriggerResult.CONTINUE
    }
  }

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      TriggerResult.FIRE_AND_PURGE
  }

Instead of returning FIRE_AND_PURGE, I register a new event-time timer:

if (eval(element)) {
    ctx.registerEventTimeTimer(timestamp + delay)
}

Please note that the timestamp of the registered timer is not equal to window.maxTimestamp.
That's why I plan to return FIRE_AND_PURGE whenever DelayedEarlyResultEventTimeTrigger.onEventTime is called as below:

override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
}

What I want to be sure about here is that the original EventTimeTrigger.onEventTime is not called except a session window is expired.




I'm not sure whether I understand your question and answer correctly, but hoping that it will give you the detailed idea of what I'm trying to figure out.

p.s. we have a large session gap (=1 hour) and do not enable the allowed lateness.

- Dongwon


On Tue, Jul 24, 2018 at 12:05 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
Out of curiosity, why don't you want to keep it like this?

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
		return time == window.maxTimestamp() ?
			TriggerResult.FIRE :
			TriggerResult.CONTINUE;
	}

I mean checking for the maxTimestamp().

Best,
Aljoscha

> On 23. Jul 2018, at 16:10, 김동원 <eastcirclek@gmail.com <ma...@gmail.com>> wrote:
> 
> Hi Aljoscha,
> 
> If that is the only case, I need to return TriggerResult.CONTINUE when time > window.maxTimestamp.
> 
> It is very fortunate that onEventTime is not called when time < window.maxTimestamp except my timer.
> 
> Thanks a lot for your quick reply.
> 
> Best,
> 
> - Dongwon
> 
> 2018. 7. 23. 오후 10:58, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> 작성:
> 
>> Hi,
>> 
>> If you set an allowed lateness that is greater than zero you will get another call to onEventTime() on window.maxTimestamp + allowedLateness.
>> 
>> Does that help answer your question?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 23. Jul 2018, at 15:40, Dongwon Kim <eastcirclek@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I want to be sure about when EventTimeTrigger.onEventTime() method is called with event-time session windows.
>>> It returns TriggerResult.FIRE only when the timestamp of the registered timer equals to the max timestamp of the current window:
>>> 
>>> 	@Override
>>> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>>> 		return time == window.maxTimestamp() ?
>>> 			TriggerResult.FIRE :
>>> 			TriggerResult.CONTINUE;
>>> 	}
>>> 
>>> As far as I understand, when EventTimeTrigger is used with event-time session window, there's no chance of EventTimeTrigger.onEventTime being called with time != window.maxTimestamp.
>>> Is it true? If not, could anyone let me know the corner case?
>>> 
>>> The reason why I'm asking is because I want to register an event-time timer when my custom trigger receives a special event which signifies the end of a session.
>>> The timestamp of the registered timer is not going to be equal to window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in such a case.
>>> As I also want to purge the content of a window when it expires, onEventTime should look like this:
>>> 
>>> 	@Override
>>> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>>> 		return TriggerResult.FIRE_AND_PURGE;
>>> 	}
>>> 
>>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>>> It looks quite dangerous and I'm not quite sure about that.
>>> 
>>> - Dongwon
>>> 
>> 



Re: Modify EventTimeTrigger only for event-time session windows

Posted by Aljoscha Krettek <al...@apache.org>.
Out of curiosity, why don't you want to keep it like this?

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
		return time == window.maxTimestamp() ?
			TriggerResult.FIRE :
			TriggerResult.CONTINUE;
	}

I mean checking for the maxTimestamp().

Best,
Aljoscha

> On 23. Jul 2018, at 16:10, 김동원 <ea...@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> If that is the only case, I need to return TriggerResult.CONTINUE when time > window.maxTimestamp.
> 
> It is very fortunate that onEventTime is not called when time < window.maxTimestamp except my timer.
> 
> Thanks a lot for your quick reply.
> 
> Best,
> 
> - Dongwon
> 
> 2018. 7. 23. 오후 10:58, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> 작성:
> 
>> Hi,
>> 
>> If you set an allowed lateness that is greater than zero you will get another call to onEventTime() on window.maxTimestamp + allowedLateness.
>> 
>> Does that help answer your question?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 23. Jul 2018, at 15:40, Dongwon Kim <eastcirclek@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I want to be sure about when EventTimeTrigger.onEventTime() method is called with event-time session windows.
>>> It returns TriggerResult.FIRE only when the timestamp of the registered timer equals to the max timestamp of the current window:
>>> 
>>> 	@Override
>>> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>>> 		return time == window.maxTimestamp() ?
>>> 			TriggerResult.FIRE :
>>> 			TriggerResult.CONTINUE;
>>> 	}
>>> 
>>> As far as I understand, when EventTimeTrigger is used with event-time session window, there's no chance of EventTimeTrigger.onEventTime being called with time != window.maxTimestamp.
>>> Is it true? If not, could anyone let me know the corner case?
>>> 
>>> The reason why I'm asking is because I want to register an event-time timer when my custom trigger receives a special event which signifies the end of a session.
>>> The timestamp of the registered timer is not going to be equal to window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in such a case.
>>> As I also want to purge the content of a window when it expires, onEventTime should look like this:
>>> 
>>> 	@Override
>>> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>>> 		return TriggerResult.FIRE_AND_PURGE;
>>> 	}
>>> 
>>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>>> It looks quite dangerous and I'm not quite sure about that.
>>> 
>>> - Dongwon
>>> 
>> 


Re: Modify EventTimeTrigger only for event-time session windows

Posted by 김동원 <ea...@gmail.com>.
Hi Aljoscha,

If that is the only case, I need to return TriggerResult.CONTINUE when time > window.maxTimestamp.

It is very fortunate that onEventTime is not called when time < window.maxTimestamp except my timer.

Thanks a lot for your quick reply.

Best,

- Dongwon

2018. 7. 23. 오후 10:58, Aljoscha Krettek <al...@apache.org> 작성:

> Hi,
> 
> If you set an allowed lateness that is greater than zero you will get another call to onEventTime() on window.maxTimestamp + allowedLateness.
> 
> Does that help answer your question?
> 
> Best,
> Aljoscha
> 
>> On 23. Jul 2018, at 15:40, Dongwon Kim <ea...@gmail.com> wrote:
>> 
>> Hi all,
>> 
>> I want to be sure about when EventTimeTrigger.onEventTime() method is called with event-time session windows.
>> It returns TriggerResult.FIRE only when the timestamp of the registered timer equals to the max timestamp of the current window:
>> 
>> 	@Override
>> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>> 		return time == window.maxTimestamp() ?
>> 			TriggerResult.FIRE :
>> 			TriggerResult.CONTINUE;
>> 	}
>> 
>> As far as I understand, when EventTimeTrigger is used with event-time session window, there's no chance of EventTimeTrigger.onEventTime being called with time != window.maxTimestamp.
>> Is it true? If not, could anyone let me know the corner case?
>> 
>> The reason why I'm asking is because I want to register an event-time timer when my custom trigger receives a special event which signifies the end of a session.
>> The timestamp of the registered timer is not going to be equal to window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in such a case.
>> As I also want to purge the content of a window when it expires, onEventTime should look like this:
>> 
>> 	@Override
>> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
>> 		return TriggerResult.FIRE_AND_PURGE;
>> 	}
>> 
>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>> It looks quite dangerous and I'm not quite sure about that.
>> 
>> - Dongwon
>> 
> 

Re: Modify EventTimeTrigger only for event-time session windows

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

If you set an allowed lateness that is greater than zero you will get another call to onEventTime() on window.maxTimestamp + allowedLateness.

Does that help answer your question?

Best,
Aljoscha

> On 23. Jul 2018, at 15:40, Dongwon Kim <ea...@gmail.com> wrote:
> 
> Hi all,
> 
> I want to be sure about when EventTimeTrigger.onEventTime() method is called with event-time session windows.
> It returns TriggerResult.FIRE only when the timestamp of the registered timer equals to the max timestamp of the current window:
> 
> 	@Override
> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
> 		return time == window.maxTimestamp() ?
> 			TriggerResult.FIRE :
> 			TriggerResult.CONTINUE;
> 	}
> 
> As far as I understand, when EventTimeTrigger is used with event-time session window, there's no chance of EventTimeTrigger.onEventTime being called with time != window.maxTimestamp.
> Is it true? If not, could anyone let me know the corner case?
> 
> The reason why I'm asking is because I want to register an event-time timer when my custom trigger receives a special event which signifies the end of a session.
> The timestamp of the registered timer is not going to be equal to window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in such a case.
> As I also want to purge the content of a window when it expires, onEventTime should look like this:
> 
> 	@Override
> 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
> 		return TriggerResult.FIRE_AND_PURGE;
> 	}
> 
> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
> It looks quite dangerous and I'm not quite sure about that.
> 
> - Dongwon
>