You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ori Popowski <or...@gmail.com> on 2020/10/06 12:44:08 UTC

Is it possible that late events are processed before the window?

I have a job with event-time session window of 30 minutes.

I output late events to side output, where I have a tumbling processing
time window of 30 minutes.

I observe that the late events are written to storage before the "main"
events.

I wanted to know if it's normal before digging into the code and debugging
the problem.

Thanks

Re: Is it possible that late events are processed before the window?

Posted by Ori Popowski <or...@gmail.com>.
Thanks

On Wed, Oct 7, 2020 at 7:06 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Ori,
>
> you are right. Events are being sent down the side output for late events
> if the event's timestamp + the allowed lateness is smaller than the current
> watermark. These events are directly seen by downstream operators which
> consume the side output for late events.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski <or...@gmail.com> wrote:
>
>> After creating a toy example I think that I've got the concept of
>> lateDataOutput wrong.
>>
>> It seems that the lateDataSideOutput has nothing to do with windowing;
>> when events arrive late they'll just go straight to the side output, and
>> there can never be any window firing of the main flow for that specific key.
>>
>> On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>> I've made an experiment where I use an evictor on the main window (not
>>> the late one), only to write a debug file when the window fires (I don't
>>> actually evict events, I've made it so I can write a debug object the
>>> moment the window finishes).
>>>
>>> I can see that indeed the late data window fires before the main window,
>>> since the mentioned debug file does not exist, but late events _do_ exist
>>> in the destination.
>>>
>>> Writing this debug object in the evictor eliminates potential problems
>>> that might be due to logic in the process function, and it proves that the
>>> window of the late events indeed fires before the main window.
>>>
>>> Here's an outline of my job:
>>>
>>> val windowedStream = senv
>>>   .addSource(kafkaSource)
>>>   ... // some operators
>>>   // like BoundedOutOfOrdereness but ignore future timestamps
>>>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>>>   ... // some more operators
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
>>> window
>>>   .sideOutputLateData(lateDataTag)
>>>   .process(new ProcessSession(sessionPlayback, config))
>>> windowedStream
>>>   .map(new SerializeSession(sessionPlayback))
>>>   .addSink(sink)
>>> windowedStream
>>>   .getSideOutput(lateDataTag)
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
>>> window
>>>   .process(new ProcessSession(sessionPlayback, config, true))
>>>   .map(new SerializeSession(sessionPlayback, late = true))
>>>
>>> So, to repeat the question, is that normal? And if not - how can I fix
>>> this?
>>>
>>> Thanks
>>>
>>> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>>
>>>> I have a job with event-time session window of 30 minutes.
>>>>
>>>> I output late events to side output, where I have a tumbling processing
>>>> time window of 30 minutes.
>>>>
>>>> I observe that the late events are written to storage before the "main"
>>>> events.
>>>>
>>>> I wanted to know if it's normal before digging into the code and
>>>> debugging the problem.
>>>>
>>>> Thanks
>>>>
>>>

Re: Is it possible that late events are processed before the window?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ori,

you are right. Events are being sent down the side output for late events
if the event's timestamp + the allowed lateness is smaller than the current
watermark. These events are directly seen by downstream operators which
consume the side output for late events.

Cheers,
Till

On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski <or...@gmail.com> wrote:

> After creating a toy example I think that I've got the concept of
> lateDataOutput wrong.
>
> It seems that the lateDataSideOutput has nothing to do with windowing;
> when events arrive late they'll just go straight to the side output, and
> there can never be any window firing of the main flow for that specific key.
>
> On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <or...@gmail.com> wrote:
>
>> I've made an experiment where I use an evictor on the main window (not
>> the late one), only to write a debug file when the window fires (I don't
>> actually evict events, I've made it so I can write a debug object the
>> moment the window finishes).
>>
>> I can see that indeed the late data window fires before the main window,
>> since the mentioned debug file does not exist, but late events _do_ exist
>> in the destination.
>>
>> Writing this debug object in the evictor eliminates potential problems
>> that might be due to logic in the process function, and it proves that the
>> window of the late events indeed fires before the main window.
>>
>> Here's an outline of my job:
>>
>> val windowedStream = senv
>>   .addSource(kafkaSource)
>>   ... // some operators
>>   // like BoundedOutOfOrdereness but ignore future timestamps
>>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>>   ... // some more operators
>>   .keyingBy { case (meta, _) => meta.toPath }
>>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
>> window
>>   .sideOutputLateData(lateDataTag)
>>   .process(new ProcessSession(sessionPlayback, config))
>> windowedStream
>>   .map(new SerializeSession(sessionPlayback))
>>   .addSink(sink)
>> windowedStream
>>   .getSideOutput(lateDataTag)
>>   .keyingBy { case (meta, _) => meta.toPath }
>>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
>> window
>>   .process(new ProcessSession(sessionPlayback, config, true))
>>   .map(new SerializeSession(sessionPlayback, late = true))
>>
>> So, to repeat the question, is that normal? And if not - how can I fix
>> this?
>>
>> Thanks
>>
>> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>>
>>> I have a job with event-time session window of 30 minutes.
>>>
>>> I output late events to side output, where I have a tumbling processing
>>> time window of 30 minutes.
>>>
>>> I observe that the late events are written to storage before the "main"
>>> events.
>>>
>>> I wanted to know if it's normal before digging into the code and
>>> debugging the problem.
>>>
>>> Thanks
>>>
>>

Re: Is it possible that late events are processed before the window?

Posted by Ori Popowski <or...@gmail.com>.
After creating a toy example I think that I've got the concept of
lateDataOutput wrong.

It seems that the lateDataSideOutput has nothing to do with windowing; when
events arrive late they'll just go straight to the side output, and there
can never be any window firing of the main flow for that specific key.

On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <or...@gmail.com> wrote:

> I've made an experiment where I use an evictor on the main window (not the
> late one), only to write a debug file when the window fires (I don't
> actually evict events, I've made it so I can write a debug object the
> moment the window finishes).
>
> I can see that indeed the late data window fires before the main window,
> since the mentioned debug file does not exist, but late events _do_ exist
> in the destination.
>
> Writing this debug object in the evictor eliminates potential problems
> that might be due to logic in the process function, and it proves that the
> window of the late events indeed fires before the main window.
>
> Here's an outline of my job:
>
> val windowedStream = senv
>   .addSource(kafkaSource)
>   ... // some operators
>   // like BoundedOutOfOrdereness but ignore future timestamps
>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>   ... // some more operators
>   .keyingBy { case (meta, _) => meta.toPath }
>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
> window
>   .sideOutputLateData(lateDataTag)
>   .process(new ProcessSession(sessionPlayback, config))
> windowedStream
>   .map(new SerializeSession(sessionPlayback))
>   .addSink(sink)
> windowedStream
>   .getSideOutput(lateDataTag)
>   .keyingBy { case (meta, _) => meta.toPath }
>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
> window
>   .process(new ProcessSession(sessionPlayback, config, true))
>   .map(new SerializeSession(sessionPlayback, late = true))
>
> So, to repeat the question, is that normal? And if not - how can I fix
> this?
>
> Thanks
>
> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <or...@gmail.com> wrote:
>
>>
>> I have a job with event-time session window of 30 minutes.
>>
>> I output late events to side output, where I have a tumbling processing
>> time window of 30 minutes.
>>
>> I observe that the late events are written to storage before the "main"
>> events.
>>
>> I wanted to know if it's normal before digging into the code and
>> debugging the problem.
>>
>> Thanks
>>
>

Re: Is it possible that late events are processed before the window?

Posted by Ori Popowski <or...@gmail.com>.
I've made an experiment where I use an evictor on the main window (not the
late one), only to write a debug file when the window fires (I don't
actually evict events, I've made it so I can write a debug object the
moment the window finishes).

I can see that indeed the late data window fires before the main window,
since the mentioned debug file does not exist, but late events _do_ exist
in the destination.

Writing this debug object in the evictor eliminates potential problems that
might be due to logic in the process function, and it proves that the
window of the late events indeed fires before the main window.

Here's an outline of my job:

val windowedStream = senv
  .addSource(kafkaSource)
  ... // some operators
  // like BoundedOutOfOrdereness but ignore future timestamps
  .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
  ... // some more operators
  .keyingBy { case (meta, _) => meta.toPath }
  .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
window
  .sideOutputLateData(lateDataTag)
  .process(new ProcessSession(sessionPlayback, config))
windowedStream
  .map(new SerializeSession(sessionPlayback))
  .addSink(sink)
windowedStream
  .getSideOutput(lateDataTag)
  .keyingBy { case (meta, _) => meta.toPath }
  .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
window
  .process(new ProcessSession(sessionPlayback, config, true))
  .map(new SerializeSession(sessionPlayback, late = true))

So, to repeat the question, is that normal? And if not - how can I fix this?

Thanks

On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <or...@gmail.com> wrote:

>
> I have a job with event-time session window of 30 minutes.
>
> I output late events to side output, where I have a tumbling processing
> time window of 30 minutes.
>
> I observe that the late events are written to storage before the "main"
> events.
>
> I wanted to know if it's normal before digging into the code and debugging
> the problem.
>
> Thanks
>