You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juho Autio <ju...@rovio.com> on 2018/05/11 08:55:00 UTC

Late data before window end is even close

I don't understand why I'm getting some data discarded as late on my Flink
stream job a long time before the window even closes.

I can not be 100% sure, but to me it seems like the kafka consumer is
basically causing the data to be dropped as "late", not the window. I
didn't expect this to ever happen?

I have a Flink stream job that gathers distinct values using a 24-hour
window. It reads the data from Kafka, using
a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
synchronize watermarks accross all kafka partitions. The maxOutOfOrderness
of the extractor is set to 10 seconds.

I have also enabled allowedLateness with 1 minute lateness on the 24-hour
window:

.timeWindow(Time.days(1))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateDataTag)
.reduce(new DistinctFunction())

I have used accumulators to see that there is some late data. I have had
multiple occurrences of those.

Now focusing on a particular case that I was investigating more closely.
Around ~12:15 o-clock my late data accumulator started showing that 1
message had been late. That's in the middle of the time window – so why
would this happen? I would expect late data to be discarded only sometime
after 00:01 if some data is arriving late for the window that just closed
at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.

To analyze the timestamps I read all messages in sequence separately from
each kafka partition and calculated the difference in timestamps between
consecutive messages. I had had exactly one message categorized as late by
Flink in this case, and at the time i was using maxOutOfOrderness = 5
seconds. I found exactly one message in one kafka partition where the
timestamp difference between messages was 5 seconds (they were out of order
by 5 s), which makes me wonder, did Flink drop the event as late because it
violated maxOutOfOrderness? Have I misunderstood the concept of late data
somehow? I only expected late data to happen on window operations. I would
expect kafka consumer to pass "late" messages onward even though watermark
doesn't change.

Thank you very much if you can find the time to look at this!

Re: Late data before window end is even close

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for reporting back and the debugging advice!

Best, Fabian

2018-06-08 9:00 GMT+02:00 Juho Autio <ju...@rovio.com>:

> Flink was NOT at fault. Turns out our Kafka producer had OS level clock
> sync problems :(
>
> Because of that, our Kafka occasionally had some messages in between with
> an incorrect timestamp. In practice they were about 7 days older than they
> should.
>
> I'm really sorry for wasting your time on this. But thank you once more
> for taking the time to answer.
>
> For any similar case, I would first advise user to extra carefully compare
> the actual timestamps of their input data. For me it was helpful to make
> this change in my Flink job: for late data output, include both processing
> time (DateTime.now()) along with the event time (original timestamp).
>
> On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Thanks for correcting me Piotr. I didn't look close enough at the code.
>> With the presently implemented logic, a record should not be emitted to a
>> side output if its window wasn't closed yet.
>>
>> 2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:
>>
>>> Generally speaking best practise is always to simplify your program as
>>> much as possible to narrow down the scope of the search. Replace data
>>> source with statically generated events, remove unnecessary components Etc.
>>> Either such process help you figure out what’s wrong on your own and if
>>> not, if you share us such minimal program that reproduces the issue, it
>>> will allow  us to debug it.
>>>
>>> Piotrek
>>>
>>>
>>> On 11 May 2018, at 13:54, Juho Autio <ju...@rovio.com> wrote:
>>>
>>> Thanks for that code snippet, I should try it out to simulate my DAG..
>>> If any suggestions how to debug futher what's causing late data on a
>>> production stream job, please let me know.
>>>
>>> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <piotr@data-artisans.com
>>> > wrote:
>>>
>>>> Hey,
>>>>
>>>> Actually I think Fabian initial message was incorrect. As far as I can
>>>> see in the code of WindowOperator (last lines of org.apache.flink.streaming.
>>>> runtime.operators.windowing.WindowOperator#processElement ), the
>>>> element is sent to late side output if it is late AND it wasn’t assigned to
>>>> any of the existing windows (because they were late as well). In other
>>>> words, it should work as you Juho are wishing: element should be marked as
>>>> late once they are overdue/late for the window after one full day.
>>>>
>>>> I have tested it and it works as expected. Following program:
>>>>
>>>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>>>
>>>> Prints only ONE number to the standard err:
>>>>
>>>> > 1394
>>>>
>>>> And there is nothing on the side output.
>>>>
>>>> Piotrek
>>>>
>>>> On 11 May 2018, at 12:32, Juho Autio <ju...@rovio.com> wrote:
>>>>
>>>> Thanks. What I still don't get is why my message got filtered in the
>>>> first place. Even if the allowed lateness filtering would be done "on the
>>>> window", data should not be dropped as late if it's not in fact late by
>>>> more than the allowedLateness setting.
>>>>
>>>> Assuming that these conditions hold:
>>>> - messages (and thus the extracted timestamps) were not out of order by
>>>> more than 5 secods (as far as I didn't make any mistake in my
>>>> partition-level analysis)
>>>> - allowedLateness=1 minute
>>>> - watermarks are assigned on kafka consumer meaning that they are
>>>> synchronized across all partitions
>>>>
>>>> I don't see how the watermark could have ever been more than 5 seconds
>>>> further when the message arrives on the isElementLate filter. Do you have
>>>> any idea on this? Is there some existing test that simulates out of order
>>>> input to flink's kafka consumer? I could try to build a test case based on
>>>> that to possibly reproduce my problem. I'm not sure how to gather enough
>>>> debug information on the production stream so that it would clearly show
>>>> the watermarks, how they progressed on each kafka partition & later in the
>>>> chain in case isElementLate filters something.
>>>>
>>>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> Thanks for bringing up this topic! I share your intuition.
>>>>> IMO, records should only be filtered out and send to a side output if
>>>>> any of the windows they would be assigned to is closed already.
>>>>>
>>>>> I had a look into the code and found that records are filtered out as
>>>>> late based on the following condition:
>>>>>
>>>>> protected boolean isElementLate(StreamRecord<IN> element){
>>>>>    return (windowAssigner.isEventTime()) &&
>>>>>       (element.getTimestamp() + allowedLateness <=
>>>>> internalTimerService.currentWatermark());
>>>>> }
>>>>>
>>>>>
>>>>> This code shows that your analysis is correct.
>>>>> Records are filtered out based on their timestamp and the current
>>>>> watermark, even though they arrive before the window is closed.
>>>>>
>>>>> OTOH, filtering out records based on the window they would end up in
>>>>> can also be tricky if records are assigned to multiple windows (e.g.,
>>>>> sliding windows).
>>>>> In this case, a side-outputted records could still be in some windows
>>>>> and not in others.
>>>>>
>>>>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>>
>>>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <ju...@rovio.com>:
>>>>>
>>>>>> I don't understand why I'm getting some data discarded as late on my
>>>>>> Flink stream job a long time before the window even closes.
>>>>>>
>>>>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>>>>> basically causing the data to be dropped as "late", not the window. I
>>>>>> didn't expect this to ever happen?
>>>>>>
>>>>>> I have a Flink stream job that gathers distinct values using a
>>>>>> 24-hour window. It reads the data from Kafka, using
>>>>>> a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>>> synchronize watermarks accross all kafka partitions. The maxOutOfOrderness
>>>>>> of the extractor is set to 10 seconds.
>>>>>>
>>>>>> I have also enabled allowedLateness with 1 minute lateness on the
>>>>>> 24-hour window:
>>>>>>
>>>>>> .timeWindow(Time.days(1))
>>>>>> .allowedLateness(Time.minutes(1))
>>>>>> .sideOutputLateData(lateDataTag)
>>>>>> .reduce(new DistinctFunction())
>>>>>>
>>>>>> I have used accumulators to see that there is some late data. I have
>>>>>> had multiple occurrences of those.
>>>>>>
>>>>>> Now focusing on a particular case that I was investigating more
>>>>>> closely. Around ~12:15 o-clock my late data accumulator started showing
>>>>>> that 1 message had been late. That's in the middle of the time window – so
>>>>>> why would this happen? I would expect late data to be discarded only
>>>>>> sometime after 00:01 if some data is arriving late for the window that just
>>>>>> closed at 00:00, and doesn't get emitted as part of 1 minute
>>>>>> allowedLateness.
>>>>>>
>>>>>> To analyze the timestamps I read all messages in sequence separately
>>>>>> from each kafka partition and calculated the difference in timestamps
>>>>>> between consecutive messages. I had had exactly one message categorized as
>>>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>>>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>>>>> timestamp difference between messages was 5 seconds (they were out of order
>>>>>> by 5 s), which makes me wonder, did Flink drop the event as late because it
>>>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>>>>> data somehow? I only expected late data to happen on window operations. I
>>>>>> would expect kafka consumer to pass "late" messages onward even though
>>>>>> watermark doesn't change.
>>>>>>
>>>>>> Thank you very much if you can find the time to look at this!
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>

Re: Late data before window end is even close

Posted by Juho Autio <ju...@rovio.com>.
Flink was NOT at fault. Turns out our Kafka producer had OS level clock
sync problems :(

Because of that, our Kafka occasionally had some messages in between with
an incorrect timestamp. In practice they were about 7 days older than they
should.

I'm really sorry for wasting your time on this. But thank you once more for
taking the time to answer.

For any similar case, I would first advise user to extra carefully compare
the actual timestamps of their input data. For me it was helpful to make
this change in my Flink job: for late data output, include both processing
time (DateTime.now()) along with the event time (original timestamp).

On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Thanks for correcting me Piotr. I didn't look close enough at the code.
> With the presently implemented logic, a record should not be emitted to a
> side output if its window wasn't closed yet.
>
> 2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:
>
>> Generally speaking best practise is always to simplify your program as
>> much as possible to narrow down the scope of the search. Replace data
>> source with statically generated events, remove unnecessary components Etc.
>> Either such process help you figure out what’s wrong on your own and if
>> not, if you share us such minimal program that reproduces the issue, it
>> will allow  us to debug it.
>>
>> Piotrek
>>
>>
>> On 11 May 2018, at 13:54, Juho Autio <ju...@rovio.com> wrote:
>>
>> Thanks for that code snippet, I should try it out to simulate my DAG.. If
>> any suggestions how to debug futher what's causing late data on a
>> production stream job, please let me know.
>>
>> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hey,
>>>
>>> Actually I think Fabian initial message was incorrect. As far as I can
>>> see in the code of WindowOperator (last lines of org.apache.flink.streaming.
>>> runtime.operators.windowing.WindowOperator#processElement ), the
>>> element is sent to late side output if it is late AND it wasn’t assigned to
>>> any of the existing windows (because they were late as well). In other
>>> words, it should work as you Juho are wishing: element should be marked as
>>> late once they are overdue/late for the window after one full day.
>>>
>>> I have tested it and it works as expected. Following program:
>>>
>>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>>
>>> Prints only ONE number to the standard err:
>>>
>>> > 1394
>>>
>>> And there is nothing on the side output.
>>>
>>> Piotrek
>>>
>>> On 11 May 2018, at 12:32, Juho Autio <ju...@rovio.com> wrote:
>>>
>>> Thanks. What I still don't get is why my message got filtered in the
>>> first place. Even if the allowed lateness filtering would be done "on the
>>> window", data should not be dropped as late if it's not in fact late by
>>> more than the allowedLateness setting.
>>>
>>> Assuming that these conditions hold:
>>> - messages (and thus the extracted timestamps) were not out of order by
>>> more than 5 secods (as far as I didn't make any mistake in my
>>> partition-level analysis)
>>> - allowedLateness=1 minute
>>> - watermarks are assigned on kafka consumer meaning that they are
>>> synchronized across all partitions
>>>
>>> I don't see how the watermark could have ever been more than 5 seconds
>>> further when the message arrives on the isElementLate filter. Do you have
>>> any idea on this? Is there some existing test that simulates out of order
>>> input to flink's kafka consumer? I could try to build a test case based on
>>> that to possibly reproduce my problem. I'm not sure how to gather enough
>>> debug information on the production stream so that it would clearly show
>>> the watermarks, how they progressed on each kafka partition & later in the
>>> chain in case isElementLate filters something.
>>>
>>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> Thanks for bringing up this topic! I share your intuition.
>>>> IMO, records should only be filtered out and send to a side output if
>>>> any of the windows they would be assigned to is closed already.
>>>>
>>>> I had a look into the code and found that records are filtered out as
>>>> late based on the following condition:
>>>>
>>>> protected boolean isElementLate(StreamRecord<IN> element){
>>>>    return (windowAssigner.isEventTime()) &&
>>>>       (element.getTimestamp() + allowedLateness <=
>>>> internalTimerService.currentWatermark());
>>>> }
>>>>
>>>>
>>>> This code shows that your analysis is correct.
>>>> Records are filtered out based on their timestamp and the current
>>>> watermark, even though they arrive before the window is closed.
>>>>
>>>> OTOH, filtering out records based on the window they would end up in
>>>> can also be tricky if records are assigned to multiple windows (e.g.,
>>>> sliding windows).
>>>> In this case, a side-outputted records could still be in some windows
>>>> and not in others.
>>>>
>>>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>>
>>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <ju...@rovio.com>:
>>>>
>>>>> I don't understand why I'm getting some data discarded as late on my
>>>>> Flink stream job a long time before the window even closes.
>>>>>
>>>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>>>> basically causing the data to be dropped as "late", not the window. I
>>>>> didn't expect this to ever happen?
>>>>>
>>>>> I have a Flink stream job that gathers distinct values using a 24-hour
>>>>> window. It reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor
>>>>> on the kafka consumer to synchronize watermarks accross all kafka
>>>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>>>>>
>>>>> I have also enabled allowedLateness with 1 minute lateness on the
>>>>> 24-hour window:
>>>>>
>>>>> .timeWindow(Time.days(1))
>>>>> .allowedLateness(Time.minutes(1))
>>>>> .sideOutputLateData(lateDataTag)
>>>>> .reduce(new DistinctFunction())
>>>>>
>>>>> I have used accumulators to see that there is some late data. I have
>>>>> had multiple occurrences of those.
>>>>>
>>>>> Now focusing on a particular case that I was investigating more
>>>>> closely. Around ~12:15 o-clock my late data accumulator started showing
>>>>> that 1 message had been late. That's in the middle of the time window – so
>>>>> why would this happen? I would expect late data to be discarded only
>>>>> sometime after 00:01 if some data is arriving late for the window that just
>>>>> closed at 00:00, and doesn't get emitted as part of 1 minute
>>>>> allowedLateness.
>>>>>
>>>>> To analyze the timestamps I read all messages in sequence separately
>>>>> from each kafka partition and calculated the difference in timestamps
>>>>> between consecutive messages. I had had exactly one message categorized as
>>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>>>> timestamp difference between messages was 5 seconds (they were out of order
>>>>> by 5 s), which makes me wonder, did Flink drop the event as late because it
>>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>>>> data somehow? I only expected late data to happen on window operations. I
>>>>> would expect kafka consumer to pass "late" messages onward even though
>>>>> watermark doesn't change.
>>>>>
>>>>> Thank you very much if you can find the time to look at this!
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>>
>

Re: Late data before window end is even close

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for correcting me Piotr. I didn't look close enough at the code.
With the presently implemented logic, a record should not be emitted to a
side output if its window wasn't closed yet.

2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:

> Generally speaking best practise is always to simplify your program as
> much as possible to narrow down the scope of the search. Replace data
> source with statically generated events, remove unnecessary components Etc.
> Either such process help you figure out what’s wrong on your own and if
> not, if you share us such minimal program that reproduces the issue, it
> will allow  us to debug it.
>
> Piotrek
>
>
> On 11 May 2018, at 13:54, Juho Autio <ju...@rovio.com> wrote:
>
> Thanks for that code snippet, I should try it out to simulate my DAG.. If
> any suggestions how to debug futher what's causing late data on a
> production stream job, please let me know.
>
> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hey,
>>
>> Actually I think Fabian initial message was incorrect. As far as I can
>> see in the code of WindowOperator (last lines of org.apache.flink.streaming.
>> runtime.operators.windowing.WindowOperator#processElement ), the element
>> is sent to late side output if it is late AND it wasn’t assigned to any of
>> the existing windows (because they were late as well). In other words, it
>> should work as you Juho are wishing: element should be marked as late once
>> they are overdue/late for the window after one full day.
>>
>> I have tested it and it works as expected. Following program:
>>
>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>
>> Prints only ONE number to the standard err:
>>
>> > 1394
>>
>> And there is nothing on the side output.
>>
>> Piotrek
>>
>> On 11 May 2018, at 12:32, Juho Autio <ju...@rovio.com> wrote:
>>
>> Thanks. What I still don't get is why my message got filtered in the
>> first place. Even if the allowed lateness filtering would be done "on the
>> window", data should not be dropped as late if it's not in fact late by
>> more than the allowedLateness setting.
>>
>> Assuming that these conditions hold:
>> - messages (and thus the extracted timestamps) were not out of order by
>> more than 5 secods (as far as I didn't make any mistake in my
>> partition-level analysis)
>> - allowedLateness=1 minute
>> - watermarks are assigned on kafka consumer meaning that they are
>> synchronized across all partitions
>>
>> I don't see how the watermark could have ever been more than 5 seconds
>> further when the message arrives on the isElementLate filter. Do you have
>> any idea on this? Is there some existing test that simulates out of order
>> input to flink's kafka consumer? I could try to build a test case based on
>> that to possibly reproduce my problem. I'm not sure how to gather enough
>> debug information on the production stream so that it would clearly show
>> the watermarks, how they progressed on each kafka partition & later in the
>> chain in case isElementLate filters something.
>>
>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Hi Juho,
>>>
>>> Thanks for bringing up this topic! I share your intuition.
>>> IMO, records should only be filtered out and send to a side output if
>>> any of the windows they would be assigned to is closed already.
>>>
>>> I had a look into the code and found that records are filtered out as
>>> late based on the following condition:
>>>
>>> protected boolean isElementLate(StreamRecord<IN> element){
>>>    return (windowAssigner.isEventTime()) &&
>>>       (element.getTimestamp() + allowedLateness <=
>>> internalTimerService.currentWatermark());
>>> }
>>>
>>>
>>> This code shows that your analysis is correct.
>>> Records are filtered out based on their timestamp and the current
>>> watermark, even though they arrive before the window is closed.
>>>
>>> OTOH, filtering out records based on the window they would end up in can
>>> also be tricky if records are assigned to multiple windows (e.g., sliding
>>> windows).
>>> In this case, a side-outputted records could still be in some windows
>>> and not in others.
>>>
>>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>>
>>> Thanks,
>>> Fabian
>>>
>>>
>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <ju...@rovio.com>:
>>>
>>>> I don't understand why I'm getting some data discarded as late on my
>>>> Flink stream job a long time before the window even closes.
>>>>
>>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>>> basically causing the data to be dropped as "late", not the window. I
>>>> didn't expect this to ever happen?
>>>>
>>>> I have a Flink stream job that gathers distinct values using a 24-hour
>>>> window. It reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor
>>>> on the kafka consumer to synchronize watermarks accross all kafka
>>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>>>>
>>>> I have also enabled allowedLateness with 1 minute lateness on the
>>>> 24-hour window:
>>>>
>>>> .timeWindow(Time.days(1))
>>>> .allowedLateness(Time.minutes(1))
>>>> .sideOutputLateData(lateDataTag)
>>>> .reduce(new DistinctFunction())
>>>>
>>>> I have used accumulators to see that there is some late data. I have
>>>> had multiple occurrences of those.
>>>>
>>>> Now focusing on a particular case that I was investigating more
>>>> closely. Around ~12:15 o-clock my late data accumulator started showing
>>>> that 1 message had been late. That's in the middle of the time window – so
>>>> why would this happen? I would expect late data to be discarded only
>>>> sometime after 00:01 if some data is arriving late for the window that just
>>>> closed at 00:00, and doesn't get emitted as part of 1 minute
>>>> allowedLateness.
>>>>
>>>> To analyze the timestamps I read all messages in sequence separately
>>>> from each kafka partition and calculated the difference in timestamps
>>>> between consecutive messages. I had had exactly one message categorized as
>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>>> timestamp difference between messages was 5 seconds (they were out of order
>>>> by 5 s), which makes me wonder, did Flink drop the event as late because it
>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>>> data somehow? I only expected late data to happen on window operations. I
>>>> would expect kafka consumer to pass "late" messages onward even though
>>>> watermark doesn't change.
>>>>
>>>> Thank you very much if you can find the time to look at this!
>>>>
>>>
>>>
>>
>>
>
>
>

Re: Late data before window end is even close

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Generally speaking best practise is always to simplify your program as much as possible to narrow down the scope of the search. Replace data source with statically generated events, remove unnecessary components Etc. Either such process help you figure out what’s wrong on your own and if not, if you share us such minimal program that reproduces the issue, it will allow  us to debug it.

Piotrek

> On 11 May 2018, at 13:54, Juho Autio <ju...@rovio.com> wrote:
> 
> Thanks for that code snippet, I should try it out to simulate my DAG.. If any suggestions how to debug futher what's causing late data on a production stream job, please let me know.
> 
> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hey,
> 
> Actually I think Fabian initial message was incorrect. As far as I can see in the code of WindowOperator (last lines of org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#processElement ), the element is sent to late side output if it is late AND it wasn’t assigned to any of the existing windows (because they were late as well). In other words, it should work as you Juho are wishing: element should be marked as late once they are overdue/late for the window after one full day.
> 
> I have tested it and it works as expected. Following program:
> 
> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a <https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a>
> 
> Prints only ONE number to the standard err:
> 
> > 1394
> 
> And there is nothing on the side output.
> 
> Piotrek
> 
>> On 11 May 2018, at 12:32, Juho Autio <juho.autio@rovio.com <ma...@rovio.com>> wrote:
>> 
>> Thanks. What I still don't get is why my message got filtered in the first place. Even if the allowed lateness filtering would be done "on the window", data should not be dropped as late if it's not in fact late by more than the allowedLateness setting.
>> 
>> Assuming that these conditions hold:
>> - messages (and thus the extracted timestamps) were not out of order by more than 5 secods (as far as I didn't make any mistake in my partition-level analysis)
>> - allowedLateness=1 minute
>> - watermarks are assigned on kafka consumer meaning that they are synchronized across all partitions
>> 
>> I don't see how the watermark could have ever been more than 5 seconds further when the message arrives on the isElementLate filter. Do you have any idea on this? Is there some existing test that simulates out of order input to flink's kafka consumer? I could try to build a test case based on that to possibly reproduce my problem. I'm not sure how to gather enough debug information on the production stream so that it would clearly show the watermarks, how they progressed on each kafka partition & later in the chain in case isElementLate filters something.
>> 
>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> Hi Juho,
>> 
>> Thanks for bringing up this topic! I share your intuition. 
>> IMO, records should only be filtered out and send to a side output if any of the windows they would be assigned to is closed already.
>> 
>> I had a look into the code and found that records are filtered out as late based on the following condition:
>>  
>> protected boolean isElementLate(StreamRecord<IN> element){
>>    return (windowAssigner.isEventTime()) &&
>>       (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
>> }
>> 
>> 
>> This code shows that your analysis is correct. 
>> Records are filtered out based on their timestamp and the current watermark, even though they arrive before the window is closed.
>> 
>> OTOH, filtering out records based on the window they would end up in can also be tricky if records are assigned to multiple windows (e.g., sliding windows).
>> In this case, a side-outputted records could still be in some windows and not in others. 
>> 
>> @Aljoscha (CC) Might have an explanation for the current behavior.
>> 
>> Thanks,
>> Fabian
>> 
>> 
>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>> I don't understand why I'm getting some data discarded as late on my Flink stream job a long time before the window even closes.
>> 
>> I can not be 100% sure, but to me it seems like the kafka consumer is basically causing the data to be dropped as "late", not the window. I didn't expect this to ever happen?
>> 
>> I have a Flink stream job that gathers distinct values using a 24-hour window. It reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>> 
>> I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:
>> 
>> .timeWindow(Time.days(1))
>> .allowedLateness(Time.minutes(1))
>> .sideOutputLateData(lateDataTag)
>> .reduce(new DistinctFunction())
>> 
>> I have used accumulators to see that there is some late data. I have had multiple occurrences of those.
>> 
>> Now focusing on a particular case that I was investigating more closely. Around ~12:15 o-clock my late data accumulator started showing that 1 message had been late. That's in the middle of the time window – so why would this happen? I would expect late data to be discarded only sometime after 00:01 if some data is arriving late for the window that just closed at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.
>> 
>> To analyze the timestamps I read all messages in sequence separately from each kafka partition and calculated the difference in timestamps between consecutive messages. I had had exactly one message categorized as late by Flink in this case, and at the time i was using maxOutOfOrderness = 5 seconds. I found exactly one message in one kafka partition where the timestamp difference between messages was 5 seconds (they were out of order by 5 s), which makes me wonder, did Flink drop the event as late because it violated maxOutOfOrderness? Have I misunderstood the concept of late data somehow? I only expected late data to happen on window operations. I would expect kafka consumer to pass "late" messages onward even though watermark doesn't change.
>> 
>> Thank you very much if you can find the time to look at this!
>> 
>> 
> 
> 
> 


Re: Late data before window end is even close

Posted by Juho Autio <ju...@rovio.com>.
Thanks for that code snippet, I should try it out to simulate my DAG.. If
any suggestions how to debug futher what's causing late data on a
production stream job, please let me know.

On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hey,
>
> Actually I think Fabian initial message was incorrect. As far as I can see
> in the code of WindowOperator (last lines of org.apache.flink.streaming.
> runtime.operators.windowing.WindowOperator#processElement ), the element
> is sent to late side output if it is late AND it wasn’t assigned to any of
> the existing windows (because they were late as well). In other words, it
> should work as you Juho are wishing: element should be marked as late once
> they are overdue/late for the window after one full day.
>
> I have tested it and it works as expected. Following program:
>
> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>
> Prints only ONE number to the standard err:
>
> > 1394
>
> And there is nothing on the side output.
>
> Piotrek
>
> On 11 May 2018, at 12:32, Juho Autio <ju...@rovio.com> wrote:
>
> Thanks. What I still don't get is why my message got filtered in the first
> place. Even if the allowed lateness filtering would be done "on the
> window", data should not be dropped as late if it's not in fact late by
> more than the allowedLateness setting.
>
> Assuming that these conditions hold:
> - messages (and thus the extracted timestamps) were not out of order by
> more than 5 secods (as far as I didn't make any mistake in my
> partition-level analysis)
> - allowedLateness=1 minute
> - watermarks are assigned on kafka consumer meaning that they are
> synchronized across all partitions
>
> I don't see how the watermark could have ever been more than 5 seconds
> further when the message arrives on the isElementLate filter. Do you have
> any idea on this? Is there some existing test that simulates out of order
> input to flink's kafka consumer? I could try to build a test case based on
> that to possibly reproduce my problem. I'm not sure how to gather enough
> debug information on the production stream so that it would clearly show
> the watermarks, how they progressed on each kafka partition & later in the
> chain in case isElementLate filters something.
>
> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Juho,
>>
>> Thanks for bringing up this topic! I share your intuition.
>> IMO, records should only be filtered out and send to a side output if any
>> of the windows they would be assigned to is closed already.
>>
>> I had a look into the code and found that records are filtered out as
>> late based on the following condition:
>>
>> protected boolean isElementLate(StreamRecord<IN> element){
>>    return (windowAssigner.isEventTime()) &&
>>       (element.getTimestamp() + allowedLateness <=
>> internalTimerService.currentWatermark());
>> }
>>
>>
>> This code shows that your analysis is correct.
>> Records are filtered out based on their timestamp and the current
>> watermark, even though they arrive before the window is closed.
>>
>> OTOH, filtering out records based on the window they would end up in can
>> also be tricky if records are assigned to multiple windows (e.g., sliding
>> windows).
>> In this case, a side-outputted records could still be in some windows and
>> not in others.
>>
>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>
>> Thanks,
>> Fabian
>>
>>
>> 2018-05-11 10:55 GMT+02:00 Juho Autio <ju...@rovio.com>:
>>
>>> I don't understand why I'm getting some data discarded as late on my
>>> Flink stream job a long time before the window even closes.
>>>
>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>> basically causing the data to be dropped as "late", not the window. I
>>> didn't expect this to ever happen?
>>>
>>> I have a Flink stream job that gathers distinct values using a 24-hour
>>> window. It reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor
>>> on the kafka consumer to synchronize watermarks accross all kafka
>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>>>
>>> I have also enabled allowedLateness with 1 minute lateness on the
>>> 24-hour window:
>>>
>>> .timeWindow(Time.days(1))
>>> .allowedLateness(Time.minutes(1))
>>> .sideOutputLateData(lateDataTag)
>>> .reduce(new DistinctFunction())
>>>
>>> I have used accumulators to see that there is some late data. I have
>>> had multiple occurrences of those.
>>>
>>> Now focusing on a particular case that I was investigating more closely.
>>> Around ~12:15 o-clock my late data accumulator started showing that 1
>>> message had been late. That's in the middle of the time window – so why
>>> would this happen? I would expect late data to be discarded only sometime
>>> after 00:01 if some data is arriving late for the window that just closed
>>> at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.
>>>
>>> To analyze the timestamps I read all messages in sequence separately
>>> from each kafka partition and calculated the difference in timestamps
>>> between consecutive messages. I had had exactly one message categorized as
>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>> timestamp difference between messages was 5 seconds (they were out of order
>>> by 5 s), which makes me wonder, did Flink drop the event as late because it
>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>> data somehow? I only expected late data to happen on window operations. I
>>> would expect kafka consumer to pass "late" messages onward even though
>>> watermark doesn't change.
>>>
>>> Thank you very much if you can find the time to look at this!
>>>
>>
>>
>
>

Re: Late data before window end is even close

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hey,

Actually I think Fabian initial message was incorrect. As far as I can see in the code of WindowOperator (last lines of org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#processElement ), the element is sent to late side output if it is late AND it wasn’t assigned to any of the existing windows (because they were late as well). In other words, it should work as you Juho are wishing: element should be marked as late once they are overdue/late for the window after one full day.

I have tested it and it works as expected. Following program:

https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a <https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a>

Prints only ONE number to the standard err:

> 1394

And there is nothing on the side output.

Piotrek

> On 11 May 2018, at 12:32, Juho Autio <ju...@rovio.com> wrote:
> 
> Thanks. What I still don't get is why my message got filtered in the first place. Even if the allowed lateness filtering would be done "on the window", data should not be dropped as late if it's not in fact late by more than the allowedLateness setting.
> 
> Assuming that these conditions hold:
> - messages (and thus the extracted timestamps) were not out of order by more than 5 secods (as far as I didn't make any mistake in my partition-level analysis)
> - allowedLateness=1 minute
> - watermarks are assigned on kafka consumer meaning that they are synchronized across all partitions
> 
> I don't see how the watermark could have ever been more than 5 seconds further when the message arrives on the isElementLate filter. Do you have any idea on this? Is there some existing test that simulates out of order input to flink's kafka consumer? I could try to build a test case based on that to possibly reproduce my problem. I'm not sure how to gather enough debug information on the production stream so that it would clearly show the watermarks, how they progressed on each kafka partition & later in the chain in case isElementLate filters something.
> 
> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
> Hi Juho,
> 
> Thanks for bringing up this topic! I share your intuition. 
> IMO, records should only be filtered out and send to a side output if any of the windows they would be assigned to is closed already.
> 
> I had a look into the code and found that records are filtered out as late based on the following condition:
>  
> protected boolean isElementLate(StreamRecord<IN> element){
>    return (windowAssigner.isEventTime()) &&
>       (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
> }
> 
> 
> This code shows that your analysis is correct. 
> Records are filtered out based on their timestamp and the current watermark, even though they arrive before the window is closed.
> 
> OTOH, filtering out records based on the window they would end up in can also be tricky if records are assigned to multiple windows (e.g., sliding windows).
> In this case, a side-outputted records could still be in some windows and not in others. 
> 
> @Aljoscha (CC) Might have an explanation for the current behavior.
> 
> Thanks,
> Fabian
> 
> 
> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
> I don't understand why I'm getting some data discarded as late on my Flink stream job a long time before the window even closes.
> 
> I can not be 100% sure, but to me it seems like the kafka consumer is basically causing the data to be dropped as "late", not the window. I didn't expect this to ever happen?
> 
> I have a Flink stream job that gathers distinct values using a 24-hour window. It reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
> 
> I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:
> 
> .timeWindow(Time.days(1))
> .allowedLateness(Time.minutes(1))
> .sideOutputLateData(lateDataTag)
> .reduce(new DistinctFunction())
> 
> I have used accumulators to see that there is some late data. I have had multiple occurrences of those.
> 
> Now focusing on a particular case that I was investigating more closely. Around ~12:15 o-clock my late data accumulator started showing that 1 message had been late. That's in the middle of the time window – so why would this happen? I would expect late data to be discarded only sometime after 00:01 if some data is arriving late for the window that just closed at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.
> 
> To analyze the timestamps I read all messages in sequence separately from each kafka partition and calculated the difference in timestamps between consecutive messages. I had had exactly one message categorized as late by Flink in this case, and at the time i was using maxOutOfOrderness = 5 seconds. I found exactly one message in one kafka partition where the timestamp difference between messages was 5 seconds (they were out of order by 5 s), which makes me wonder, did Flink drop the event as late because it violated maxOutOfOrderness? Have I misunderstood the concept of late data somehow? I only expected late data to happen on window operations. I would expect kafka consumer to pass "late" messages onward even though watermark doesn't change.
> 
> Thank you very much if you can find the time to look at this!
> 
> 


Re: Late data before window end is even close

Posted by Juho Autio <ju...@rovio.com>.
Thanks. What I still don't get is why my message got filtered in the first
place. Even if the allowed lateness filtering would be done "on the
window", data should not be dropped as late if it's not in fact late by
more than the allowedLateness setting.

Assuming that these conditions hold:
- messages (and thus the extracted timestamps) were not out of order by
more than 5 secods (as far as I didn't make any mistake in my
partition-level analysis)
- allowedLateness=1 minute
- watermarks are assigned on kafka consumer meaning that they are
synchronized across all partitions

I don't see how the watermark could have ever been more than 5 seconds
further when the message arrives on the isElementLate filter. Do you have
any idea on this? Is there some existing test that simulates out of order
input to flink's kafka consumer? I could try to build a test case based on
that to possibly reproduce my problem. I'm not sure how to gather enough
debug information on the production stream so that it would clearly show
the watermarks, how they progressed on each kafka partition & later in the
chain in case isElementLate filters something.

On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Juho,
>
> Thanks for bringing up this topic! I share your intuition.
> IMO, records should only be filtered out and send to a side output if any
> of the windows they would be assigned to is closed already.
>
> I had a look into the code and found that records are filtered out as late
> based on the following condition:
>
> protected boolean isElementLate(StreamRecord<IN> element){
>    return (windowAssigner.isEventTime()) &&
>       (element.getTimestamp() + allowedLateness <= internalTimerService.
> currentWatermark());
> }
>
>
> This code shows that your analysis is correct.
> Records are filtered out based on their timestamp and the current
> watermark, even though they arrive before the window is closed.
>
> OTOH, filtering out records based on the window they would end up in can
> also be tricky if records are assigned to multiple windows (e.g., sliding
> windows).
> In this case, a side-outputted records could still be in some windows and
> not in others.
>
> @Aljoscha (CC) Might have an explanation for the current behavior.
>
> Thanks,
> Fabian
>
>
> 2018-05-11 10:55 GMT+02:00 Juho Autio <ju...@rovio.com>:
>
>> I don't understand why I'm getting some data discarded as late on my
>> Flink stream job a long time before the window even closes.
>>
>> I can not be 100% sure, but to me it seems like the kafka consumer is
>> basically causing the data to be dropped as "late", not the window. I
>> didn't expect this to ever happen?
>>
>> I have a Flink stream job that gathers distinct values using a 24-hour
>> window. It reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor
>> on the kafka consumer to synchronize watermarks accross all kafka
>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>>
>> I have also enabled allowedLateness with 1 minute lateness on the
>> 24-hour window:
>>
>> .timeWindow(Time.days(1))
>> .allowedLateness(Time.minutes(1))
>> .sideOutputLateData(lateDataTag)
>> .reduce(new DistinctFunction())
>>
>> I have used accumulators to see that there is some late data. I have had
>> multiple occurrences of those.
>>
>> Now focusing on a particular case that I was investigating more closely.
>> Around ~12:15 o-clock my late data accumulator started showing that 1
>> message had been late. That's in the middle of the time window – so why
>> would this happen? I would expect late data to be discarded only sometime
>> after 00:01 if some data is arriving late for the window that just closed
>> at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.
>>
>> To analyze the timestamps I read all messages in sequence separately from
>> each kafka partition and calculated the difference in timestamps between
>> consecutive messages. I had had exactly one message categorized as late by
>> Flink in this case, and at the time i was using maxOutOfOrderness = 5
>> seconds. I found exactly one message in one kafka partition where the
>> timestamp difference between messages was 5 seconds (they were out of order
>> by 5 s), which makes me wonder, did Flink drop the event as late because it
>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>> data somehow? I only expected late data to happen on window operations. I
>> would expect kafka consumer to pass "late" messages onward even though
>> watermark doesn't change.
>>
>> Thank you very much if you can find the time to look at this!
>>
>
>

Re: Late data before window end is even close

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Juho,

Thanks for bringing up this topic! I share your intuition.
IMO, records should only be filtered out and send to a side output if any
of the windows they would be assigned to is closed already.

I had a look into the code and found that records are filtered out as late
based on the following condition:

protected boolean isElementLate(StreamRecord<IN> element){
   return (windowAssigner.isEventTime()) &&
      (element.getTimestamp() + allowedLateness <=
internalTimerService.currentWatermark());
}


This code shows that your analysis is correct.
Records are filtered out based on their timestamp and the current
watermark, even though they arrive before the window is closed.

OTOH, filtering out records based on the window they would end up in can
also be tricky if records are assigned to multiple windows (e.g., sliding
windows).
In this case, a side-outputted records could still be in some windows and
not in others.

@Aljoscha (CC) Might have an explanation for the current behavior.

Thanks,
Fabian


2018-05-11 10:55 GMT+02:00 Juho Autio <ju...@rovio.com>:

> I don't understand why I'm getting some data discarded as late on my Flink
> stream job a long time before the window even closes.
>
> I can not be 100% sure, but to me it seems like the kafka consumer is
> basically causing the data to be dropped as "late", not the window. I
> didn't expect this to ever happen?
>
> I have a Flink stream job that gathers distinct values using a 24-hour
> window. It reads the data from Kafka, using a
> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
> synchronize watermarks accross all kafka partitions. The maxOutOfOrderness
> of the extractor is set to 10 seconds.
>
> I have also enabled allowedLateness with 1 minute lateness on the 24-hour
> window:
>
> .timeWindow(Time.days(1))
> .allowedLateness(Time.minutes(1))
> .sideOutputLateData(lateDataTag)
> .reduce(new DistinctFunction())
>
> I have used accumulators to see that there is some late data. I have had
> multiple occurrences of those.
>
> Now focusing on a particular case that I was investigating more closely.
> Around ~12:15 o-clock my late data accumulator started showing that 1
> message had been late. That's in the middle of the time window – so why
> would this happen? I would expect late data to be discarded only sometime
> after 00:01 if some data is arriving late for the window that just closed
> at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.
>
> To analyze the timestamps I read all messages in sequence separately from
> each kafka partition and calculated the difference in timestamps between
> consecutive messages. I had had exactly one message categorized as late by
> Flink in this case, and at the time i was using maxOutOfOrderness = 5
> seconds. I found exactly one message in one kafka partition where the
> timestamp difference between messages was 5 seconds (they were out of order
> by 5 s), which makes me wonder, did Flink drop the event as late because it
> violated maxOutOfOrderness? Have I misunderstood the concept of late data
> somehow? I only expected late data to happen on window operations. I would
> expect kafka consumer to pass "late" messages onward even though watermark
> doesn't change.
>
> Thank you very much if you can find the time to look at this!
>