You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matthias Broecheler <ma...@dataeng.ai> on 2021/08/13 19:43:42 UTC

Periodic output at end of stream

Hey guys,

I have a KeyedProcessFunction that gathers statistics on the events that
flow through and emits it periodically (every few seconds) to a SideOutput.
However, at the end of stream the last set of statistics don't get emitted.
I read on the mailing list that processing time timers that are pending
don't get triggered when Flink cleans up a stream, but that event timers do
get triggered because a watermark with Long.MAX_VALUE is sent through the
stream.
Hence, I thought that I could register a "backup" event timer for
Long.MAX_VALUE-1 to make sure that my process function gets notified when
the stream ends to emit the in-flight statistics.

However, now my simple test case (with a data source fromCollection of 4
elements) keeps iterating over the same 4 elements in an infinite loop.

I don't know how to make sense of this and would appreciate your help.
Is there a better way to set a timer that gets triggered at the end of
stream?
And for my education: Why does registering an event timer cause an infinite
loop over the source elements?

Thanks a lot and have a wonderful weekend,
Matthias

Re: Periodic output at end of stream

Posted by JING ZHANG <be...@gmail.com>.
Hi Matthias,
You are welcome, I'm glad to hear the information is helpful to you.

Best,
JING ZHANG

Matthias Broecheler <ma...@dataeng.ai> 于2021年8月22日周日 上午3:48写道:

> Thank you so much for debugging this, JING. This helps me better
> understand how Flink operates internally. Appreciate the pointer to
> removing checkpointing in order to see the stack trace.
>
> Have a great weekend,
> Matthias
>
> On Sat, Aug 21, 2021 at 3:36 AM JING ZHANG <be...@gmail.com> wrote:
>
>> Hi Matthias,
>> After I debug, I found *the loop because the job is trying to recover
>> from an Exception*. The exception is as following,
>> [image: image.png]
>>
>> *The root cause of the NullPointerException is there is a possibility
>> in  `BufferedLatestSelector#onTimer` send a null value to downstream. *
>> When `latest` value state is null?  The bounded stream is ended very
>> quickly, so all MAX_VALUE rowtime timer triggers are triggered,
>> When the registered proctime timer trigger is called the `latest` value
>> is null, so `BufferedLatestSelector` would send null value to
>> `PrintSinkFunction` which
>> would cause the NullPointException.
>> *You could fix the exception by checking if the value is not null before
>> sending the output to downstream in `BufferedLatestSelector#onTimer` .*
>> Besides, you could find the above exception stack if you remove `
>> FlinkUtilities.enableCheckpointing(flinkEnv);` in `MainRepl#main`.
>> Welcome to discuss in a step further if there is any problem.
>>
>> Best,
>> JING ZHANG
>>
>> JING ZHANG <be...@gmail.com> 于2021年8月20日周五 下午1:00写道:
>>
>>> Hi Matthias,
>>> Thanks for providing the example, I would reply back soon after I do
>>> some debug.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月19日周四 上午1:53写道:
>>>
>>>> Hey JING,
>>>>
>>>> thanks for getting back to me. I tried to produce the smallest,
>>>> self-contained example that produces the phenomenon:
>>>> https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f
>>>>
>>>> If you run MainRepl you should see an infinite loop of re-processing
>>>> the 5 integers. The offending process is BufferedLatestSelector -
>>>> specifically the event timer that is registered in it. Without the timer
>>>> the process will not emit an output.
>>>>
>>>> The timer is set whenever the state is null. Is there a problem with
>>>> how I implemented that buffering process?
>>>> Thank you,
>>>> Matthias
>>>>
>>>> On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <be...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Matthias,
>>>>> How often do you register the event-time timer?
>>>>> It is registered per input record, or re-registered a new timer after
>>>>> an event-time timer is triggered?
>>>>> Would you please provide your test case code, it would be very helpful
>>>>> for troubleshooting.
>>>>>
>>>>> Best wishes,
>>>>> JING ZHANG
>>>>>
>>>>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月14日周六 上午3:44写道:
>>>>>
>>>>>> Hey guys,
>>>>>>
>>>>>> I have a KeyedProcessFunction that gathers statistics on the events
>>>>>> that flow through and emits it periodically (every few seconds) to a
>>>>>> SideOutput.
>>>>>> However, at the end of stream the last set of statistics don't get
>>>>>> emitted. I read on the mailing list that processing time timers that are
>>>>>> pending don't get triggered when Flink cleans up a stream, but that event
>>>>>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>>>>>> through the stream.
>>>>>> Hence, I thought that I could register a "backup" event timer for
>>>>>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>>>>>> the stream ends to emit the in-flight statistics.
>>>>>>
>>>>>> However, now my simple test case (with a data source fromCollection
>>>>>> of 4 elements) keeps iterating over the same 4 elements in an infinite loop.
>>>>>>
>>>>>> I don't know how to make sense of this and would appreciate your
>>>>>> help.
>>>>>> Is there a better way to set a timer that gets triggered at the end
>>>>>> of stream?
>>>>>> And for my education: Why does registering an event timer cause an
>>>>>> infinite loop over the source elements?
>>>>>>
>>>>>> Thanks a lot and have a wonderful weekend,
>>>>>> Matthias
>>>>>>
>>>>>

Re: Periodic output at end of stream

Posted by Matthias Broecheler <ma...@dataeng.ai>.
Thank you so much for debugging this, JING. This helps me better understand
how Flink operates internally. Appreciate the pointer to removing
checkpointing in order to see the stack trace.

Have a great weekend,
Matthias

On Sat, Aug 21, 2021 at 3:36 AM JING ZHANG <be...@gmail.com> wrote:

> Hi Matthias,
> After I debug, I found *the loop because the job is trying to recover
> from an Exception*. The exception is as following,
> [image: image.png]
>
> *The root cause of the NullPointerException is there is a possibility
> in  `BufferedLatestSelector#onTimer` send a null value to downstream. *
> When `latest` value state is null?  The bounded stream is ended very
> quickly, so all MAX_VALUE rowtime timer triggers are triggered,
> When the registered proctime timer trigger is called the `latest` value is
> null, so `BufferedLatestSelector` would send null value to
> `PrintSinkFunction` which
> would cause the NullPointException.
> *You could fix the exception by checking if the value is not null before
> sending the output to downstream in `BufferedLatestSelector#onTimer` .*
> Besides, you could find the above exception stack if you remove `
> FlinkUtilities.enableCheckpointing(flinkEnv);` in `MainRepl#main`.
> Welcome to discuss in a step further if there is any problem.
>
> Best,
> JING ZHANG
>
> JING ZHANG <be...@gmail.com> 于2021年8月20日周五 下午1:00写道:
>
>> Hi Matthias,
>> Thanks for providing the example, I would reply back soon after I do some
>> debug.
>>
>> Best,
>> JING ZHANG
>>
>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月19日周四 上午1:53写道:
>>
>>> Hey JING,
>>>
>>> thanks for getting back to me. I tried to produce the smallest,
>>> self-contained example that produces the phenomenon:
>>> https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f
>>>
>>> If you run MainRepl you should see an infinite loop of re-processing the
>>> 5 integers. The offending process is BufferedLatestSelector - specifically
>>> the event timer that is registered in it. Without the timer the process
>>> will not emit an output.
>>>
>>> The timer is set whenever the state is null. Is there a problem with how
>>> I implemented that buffering process?
>>> Thank you,
>>> Matthias
>>>
>>> On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <be...@gmail.com> wrote:
>>>
>>>> Hi Matthias,
>>>> How often do you register the event-time timer?
>>>> It is registered per input record, or re-registered a new timer after
>>>> an event-time timer is triggered?
>>>> Would you please provide your test case code, it would be very helpful
>>>> for troubleshooting.
>>>>
>>>> Best wishes,
>>>> JING ZHANG
>>>>
>>>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月14日周六 上午3:44写道:
>>>>
>>>>> Hey guys,
>>>>>
>>>>> I have a KeyedProcessFunction that gathers statistics on the events
>>>>> that flow through and emits it periodically (every few seconds) to a
>>>>> SideOutput.
>>>>> However, at the end of stream the last set of statistics don't get
>>>>> emitted. I read on the mailing list that processing time timers that are
>>>>> pending don't get triggered when Flink cleans up a stream, but that event
>>>>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>>>>> through the stream.
>>>>> Hence, I thought that I could register a "backup" event timer for
>>>>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>>>>> the stream ends to emit the in-flight statistics.
>>>>>
>>>>> However, now my simple test case (with a data source fromCollection of
>>>>> 4 elements) keeps iterating over the same 4 elements in an infinite loop.
>>>>>
>>>>> I don't know how to make sense of this and would appreciate your help.
>>>>> Is there a better way to set a timer that gets triggered at the end of
>>>>> stream?
>>>>> And for my education: Why does registering an event timer cause an
>>>>> infinite loop over the source elements?
>>>>>
>>>>> Thanks a lot and have a wonderful weekend,
>>>>> Matthias
>>>>>
>>>>

Re: Periodic output at end of stream

Posted by JING ZHANG <be...@gmail.com>.
Hi Matthias,
After I debug, I found *the loop because the job is trying to recover from
an Exception*. The exception is as following,
[image: image.png]

*The root cause of the NullPointerException is there is a possibility
in  `BufferedLatestSelector#onTimer` send a null value to downstream. *
When `latest` value state is null?  The bounded stream is ended very
quickly, so all MAX_VALUE rowtime timer triggers are triggered,
When the registered proctime timer trigger is called the `latest` value is
null, so `BufferedLatestSelector` would send null value to
`PrintSinkFunction` which
would cause the NullPointException.
*You could fix the exception by checking if the value is not null before
sending the output to downstream in `BufferedLatestSelector#onTimer` .*
Besides, you could find the above exception stack if you remove `
FlinkUtilities.enableCheckpointing(flinkEnv);` in `MainRepl#main`.
Welcome to discuss in a step further if there is any problem.

Best,
JING ZHANG

JING ZHANG <be...@gmail.com> 于2021年8月20日周五 下午1:00写道:

> Hi Matthias,
> Thanks for providing the example, I would reply back soon after I do some
> debug.
>
> Best,
> JING ZHANG
>
> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月19日周四 上午1:53写道:
>
>> Hey JING,
>>
>> thanks for getting back to me. I tried to produce the smallest,
>> self-contained example that produces the phenomenon:
>> https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f
>>
>> If you run MainRepl you should see an infinite loop of re-processing the
>> 5 integers. The offending process is BufferedLatestSelector - specifically
>> the event timer that is registered in it. Without the timer the process
>> will not emit an output.
>>
>> The timer is set whenever the state is null. Is there a problem with how
>> I implemented that buffering process?
>> Thank you,
>> Matthias
>>
>> On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <be...@gmail.com> wrote:
>>
>>> Hi Matthias,
>>> How often do you register the event-time timer?
>>> It is registered per input record, or re-registered a new timer after an
>>> event-time timer is triggered?
>>> Would you please provide your test case code, it would be very helpful
>>> for troubleshooting.
>>>
>>> Best wishes,
>>> JING ZHANG
>>>
>>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月14日周六 上午3:44写道:
>>>
>>>> Hey guys,
>>>>
>>>> I have a KeyedProcessFunction that gathers statistics on the events
>>>> that flow through and emits it periodically (every few seconds) to a
>>>> SideOutput.
>>>> However, at the end of stream the last set of statistics don't get
>>>> emitted. I read on the mailing list that processing time timers that are
>>>> pending don't get triggered when Flink cleans up a stream, but that event
>>>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>>>> through the stream.
>>>> Hence, I thought that I could register a "backup" event timer for
>>>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>>>> the stream ends to emit the in-flight statistics.
>>>>
>>>> However, now my simple test case (with a data source fromCollection of
>>>> 4 elements) keeps iterating over the same 4 elements in an infinite loop.
>>>>
>>>> I don't know how to make sense of this and would appreciate your help.
>>>> Is there a better way to set a timer that gets triggered at the end of
>>>> stream?
>>>> And for my education: Why does registering an event timer cause an
>>>> infinite loop over the source elements?
>>>>
>>>> Thanks a lot and have a wonderful weekend,
>>>> Matthias
>>>>
>>>

Re: Periodic output at end of stream

Posted by JING ZHANG <be...@gmail.com>.
Hi Matthias,
Thanks for providing the example, I would reply back soon after I do some
debug.

Best,
JING ZHANG

Matthias Broecheler <ma...@dataeng.ai> 于2021年8月19日周四 上午1:53写道:

> Hey JING,
>
> thanks for getting back to me. I tried to produce the smallest,
> self-contained example that produces the phenomenon:
> https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f
>
> If you run MainRepl you should see an infinite loop of re-processing the 5
> integers. The offending process is BufferedLatestSelector - specifically
> the event timer that is registered in it. Without the timer the process
> will not emit an output.
>
> The timer is set whenever the state is null. Is there a problem with how I
> implemented that buffering process?
> Thank you,
> Matthias
>
> On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <be...@gmail.com> wrote:
>
>> Hi Matthias,
>> How often do you register the event-time timer?
>> It is registered per input record, or re-registered a new timer after an
>> event-time timer is triggered?
>> Would you please provide your test case code, it would be very helpful
>> for troubleshooting.
>>
>> Best wishes,
>> JING ZHANG
>>
>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月14日周六 上午3:44写道:
>>
>>> Hey guys,
>>>
>>> I have a KeyedProcessFunction that gathers statistics on the events that
>>> flow through and emits it periodically (every few seconds) to a SideOutput.
>>> However, at the end of stream the last set of statistics don't get
>>> emitted. I read on the mailing list that processing time timers that are
>>> pending don't get triggered when Flink cleans up a stream, but that event
>>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>>> through the stream.
>>> Hence, I thought that I could register a "backup" event timer for
>>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>>> the stream ends to emit the in-flight statistics.
>>>
>>> However, now my simple test case (with a data source fromCollection of 4
>>> elements) keeps iterating over the same 4 elements in an infinite loop.
>>>
>>> I don't know how to make sense of this and would appreciate your help.
>>> Is there a better way to set a timer that gets triggered at the end of
>>> stream?
>>> And for my education: Why does registering an event timer cause an
>>> infinite loop over the source elements?
>>>
>>> Thanks a lot and have a wonderful weekend,
>>> Matthias
>>>
>>

Re: Periodic output at end of stream

Posted by Matthias Broecheler <ma...@dataeng.ai>.
Hey JING,

thanks for getting back to me. I tried to produce the smallest,
self-contained example that produces the phenomenon:
https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f

If you run MainRepl you should see an infinite loop of re-processing the 5
integers. The offending process is BufferedLatestSelector - specifically
the event timer that is registered in it. Without the timer the process
will not emit an output.

The timer is set whenever the state is null. Is there a problem with how I
implemented that buffering process?
Thank you,
Matthias

On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <be...@gmail.com> wrote:

> Hi Matthias,
> How often do you register the event-time timer?
> It is registered per input record, or re-registered a new timer after an
> event-time timer is triggered?
> Would you please provide your test case code, it would be very helpful for
> troubleshooting.
>
> Best wishes,
> JING ZHANG
>
> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月14日周六 上午3:44写道:
>
>> Hey guys,
>>
>> I have a KeyedProcessFunction that gathers statistics on the events that
>> flow through and emits it periodically (every few seconds) to a SideOutput.
>> However, at the end of stream the last set of statistics don't get
>> emitted. I read on the mailing list that processing time timers that are
>> pending don't get triggered when Flink cleans up a stream, but that event
>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>> through the stream.
>> Hence, I thought that I could register a "backup" event timer for
>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>> the stream ends to emit the in-flight statistics.
>>
>> However, now my simple test case (with a data source fromCollection of 4
>> elements) keeps iterating over the same 4 elements in an infinite loop.
>>
>> I don't know how to make sense of this and would appreciate your help.
>> Is there a better way to set a timer that gets triggered at the end of
>> stream?
>> And for my education: Why does registering an event timer cause an
>> infinite loop over the source elements?
>>
>> Thanks a lot and have a wonderful weekend,
>> Matthias
>>
>

Re: Periodic output at end of stream

Posted by JING ZHANG <be...@gmail.com>.
Hi Matthias,
How often do you register the event-time timer?
It is registered per input record, or re-registered a new timer after an
event-time timer is triggered?
Would you please provide your test case code, it would be very helpful for
troubleshooting.

Best wishes,
JING ZHANG

Matthias Broecheler <ma...@dataeng.ai> 于2021年8月14日周六 上午3:44写道:

> Hey guys,
>
> I have a KeyedProcessFunction that gathers statistics on the events that
> flow through and emits it periodically (every few seconds) to a SideOutput.
> However, at the end of stream the last set of statistics don't get
> emitted. I read on the mailing list that processing time timers that are
> pending don't get triggered when Flink cleans up a stream, but that event
> timers do get triggered because a watermark with Long.MAX_VALUE is sent
> through the stream.
> Hence, I thought that I could register a "backup" event timer for
> Long.MAX_VALUE-1 to make sure that my process function gets notified when
> the stream ends to emit the in-flight statistics.
>
> However, now my simple test case (with a data source fromCollection of 4
> elements) keeps iterating over the same 4 elements in an infinite loop.
>
> I don't know how to make sense of this and would appreciate your help.
> Is there a better way to set a timer that gets triggered at the end of
> stream?
> And for my education: Why does registering an event timer cause an
> infinite loop over the source elements?
>
> Thanks a lot and have a wonderful weekend,
> Matthias
>