You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2021/09/24 04:54:25 UTC

stream processing savepoints and watermarks question

Something strange happened today.
When we tried to shutdown a job with a savepoint, the watermarks became
equal to 2^63 - 1.

This caused timers to fire indefinitely and crash downstream systems with
overloaded untrue data.

We are using event time processing with Kafka as our source.

It seems impossible for a watermark to be that large.

I know its possible stream with a batch execution mode.  But this was
stream processing.

What can cause this?  Is this normal behavior when creating a savepoint?

Re: stream processing savepoints and watermarks question

Posted by Marco Villalobos <mv...@kineteque.com>.
Everybody,

Thank you for the quick response.

Yes, we inadvertently used the -d/--drain flag when stopping the job. We
were not aware that it would cause a MAX_WATERMARK to roll through our
system.

MAX_WATERMARKS are catastrophic for the event time timers we have in our
system.

We know now never to use -d again for this situation.

Again, thank you.

-Marco

On Thu, Sep 23, 2021 at 11:01 PM JING ZHANG <be...@gmail.com> wrote:

> Hi Macro,
> Do you specified drain flag when stop a job with a savepoint?
> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
> before the last checkpoint barrier.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>
> Best,
> JING ZHANG
>
> Marco Villalobos <mv...@kineteque.com> 于2021年9月24日周五 下午12:54写道:
>
>> Something strange happened today.
>> When we tried to shutdown a job with a savepoint, the watermarks became
>> equal to 2^63 - 1.
>>
>> This caused timers to fire indefinitely and crash downstream systems with
>> overloaded untrue data.
>>
>> We are using event time processing with Kafka as our source.
>>
>> It seems impossible for a watermark to be that large.
>>
>> I know its possible stream with a batch execution mode.  But this was
>> stream processing.
>>
>> What can cause this?  Is this normal behavior when creating a savepoint?
>>
>

RE: stream processing savepoints and watermarks question

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi all,

The way I understand the matter is that the described behavior is intentional for event time timers:


  *   When called, an event time handler can register new timers

·         The timestamp parameter (override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[K, I, O]#OnTimerContext, out: Collector[O]): Unit = { …)

     *   Is set to the timeout time of the timer, not the watermark that caused the timeout !
  *   onTimer, as said, can register new timers, timeout time can even be in the past,
  *   in that case the timeout is handled immediately after current onTimer(…) call
  *   as long as onTimer registers new timers it will iterate through all these timers

When an operator receives a watermark it fires all registered timers with timeout <= watermark, in timeout order, also the ones registered in onTimer().

This is especially the case for a  MAX_WATERMARK watermark, but will be the same for any watermark that lies in the future.

For your case, @Marco, you could break this pattern by comparing the timeout to be registered with current processing time and if that lies safely too much in the future: not register the timeout.
That would break the infinite iteration over timers …

I believe the behavior exhibited by flink is intentional and no defect!

What do you think?

Thias


From: JING ZHANG <be...@gmail.com>
Sent: Freitag, 24. September 2021 12:25
To: Guowei Ma <gu...@gmail.com>
Cc: Marco Villalobos <mv...@kineteque.com>; user <us...@flink.apache.org>
Subject: Re: stream processing savepoints and watermarks question

Hi Guowei,
Thanks for quick response, maybe I didn't express it clearly in the last email.
In fact, above case happened in reality, not what I imagined.
When MAX_WATERMARK is received, the operator will try to fire all registered event-time timers. However in the above case, new timers are continuous being registered.
I would try to reproduce the problem in an ITCase, and once completed I would provide the code.

Best,
JING ZHANG

Guowei Ma <gu...@gmail.com>> 于2021年9月24日周五 下午5:16写道:
Hi, JING

Thanks for the case.
But I am not sure this would happen. As far as I know the event timer could only be triggered when there is a watermark (except the "quiesce phase").
I think it could not advance any watermarks after MAX_WATERMARK is received.

Best,
Guowei


On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG <be...@gmail.com>> wrote:
Hi Guowei,
I could provide a case that I have encountered which timers to fire indefinitely when doing drain savepoint.
After an event timer is triggered, it registers another event timer whose value equals the value of triggered timer plus an interval time.
If a MAX_WATERMARK comes, the timer is triggered, then registers another timer and forever.
I'm not sure whether Macro meets a similar problem.

Best,
JING ZHANG



Guowei Ma <gu...@gmail.com>> 于2021年9月24日周五 下午4:01写道:
Hi Macro

Indeed, as mentioned by JING, if you want to drain when triggering savepoint, you will encounter this MAX_WATERMARK.
But I have a problem. In theory, even with MAX_WATERMARK, there will not be an infinite number of timers. And these timers should be generated by the application code.
You can share your code if it is convenient for you.

Best,
Guowei


On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG <be...@gmail.com>> wrote:
Hi Macro,
Do you specified drain flag when stop a job with a savepoint?
If the --drain flag is specified, then a MAX_WATERMARK will be emitted before the last checkpoint barrier.

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint

Best,
JING ZHANG

Marco Villalobos <mv...@kineteque.com>> 于2021年9月24日周五 下午12:54写道:
Something strange happened today.
When we tried to shutdown a job with a savepoint, the watermarks became equal to 2^63 - 1.

This caused timers to fire indefinitely and crash downstream systems with overloaded untrue data.

We are using event time processing with Kafka as our source.

It seems impossible for a watermark to be that large.

I know its possible stream with a batch execution mode.  But this was stream processing.

What can cause this?  Is this normal behavior when creating a savepoint?
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: stream processing savepoints and watermarks question

Posted by JING ZHANG <be...@gmail.com>.
Hi Guowei,
Thanks for quick response, maybe I didn't express it clearly in the last
email.
In fact, above case happened in reality, not what I imagined.
When MAX_WATERMARK is received, the operator will try to fire all
registered event-time timers. However in the above case, new timers are
continuous being registered.
I would try to reproduce the problem in an ITCase, and once completed I
would provide the code.

Best,
JING ZHANG

Guowei Ma <gu...@gmail.com> 于2021年9月24日周五 下午5:16写道:

> Hi, JING
>
> Thanks for the case.
> But I am not sure this would happen. As far as I know the event timer
> could only be triggered when there is a watermark (except the "quiesce
> phase").
> I think it could not advance any watermarks after MAX_WATERMARK is
> received.
>
> Best,
> Guowei
>
>
> On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG <be...@gmail.com> wrote:
>
>> Hi Guowei,
>> I could provide a case that I have encountered which timers to fire
>> indefinitely when doing drain savepoint.
>> After an event timer is triggered, it registers another event timer
>> whose value equals the value of triggered timer plus an interval time.
>> If a MAX_WATERMARK comes, the timer is triggered, then registers another
>> timer and forever.
>> I'm not sure whether Macro meets a similar problem.
>>
>> Best,
>> JING ZHANG
>>
>>
>>
>> Guowei Ma <gu...@gmail.com> 于2021年9月24日周五 下午4:01写道:
>>
>>> Hi Macro
>>>
>>> Indeed, as mentioned by JING, if you want to drain when triggering
>>> savepoint, you will encounter this MAX_WATERMARK.
>>> But I have a problem. In theory, even with MAX_WATERMARK, there will not
>>> be an infinite number of timers. And these timers should be generated by
>>> the application code.
>>> You can share your code if it is convenient for you.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG <be...@gmail.com> wrote:
>>>
>>>> Hi Macro,
>>>> Do you specified drain flag when stop a job with a savepoint?
>>>> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
>>>> before the last checkpoint barrier.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>> Marco Villalobos <mv...@kineteque.com> 于2021年9月24日周五 下午12:54写道:
>>>>
>>>>> Something strange happened today.
>>>>> When we tried to shutdown a job with a savepoint, the watermarks
>>>>> became equal to 2^63 - 1.
>>>>>
>>>>> This caused timers to fire indefinitely and crash downstream systems
>>>>> with overloaded untrue data.
>>>>>
>>>>> We are using event time processing with Kafka as our source.
>>>>>
>>>>> It seems impossible for a watermark to be that large.
>>>>>
>>>>> I know its possible stream with a batch execution mode.  But this was
>>>>> stream processing.
>>>>>
>>>>> What can cause this?  Is this normal behavior when creating a
>>>>> savepoint?
>>>>>
>>>>

Re: stream processing savepoints and watermarks question

Posted by Guowei Ma <gu...@gmail.com>.
Hi, JING

Thanks for the case.
But I am not sure this would happen. As far as I know the event timer could
only be triggered when there is a watermark (except the "quiesce phase").
I think it could not advance any watermarks after MAX_WATERMARK is received.

Best,
Guowei


On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG <be...@gmail.com> wrote:

> Hi Guowei,
> I could provide a case that I have encountered which timers to fire
> indefinitely when doing drain savepoint.
> After an event timer is triggered, it registers another event timer
> whose value equals the value of triggered timer plus an interval time.
> If a MAX_WATERMARK comes, the timer is triggered, then registers another
> timer and forever.
> I'm not sure whether Macro meets a similar problem.
>
> Best,
> JING ZHANG
>
>
>
> Guowei Ma <gu...@gmail.com> 于2021年9月24日周五 下午4:01写道:
>
>> Hi Macro
>>
>> Indeed, as mentioned by JING, if you want to drain when triggering
>> savepoint, you will encounter this MAX_WATERMARK.
>> But I have a problem. In theory, even with MAX_WATERMARK, there will not
>> be an infinite number of timers. And these timers should be generated by
>> the application code.
>> You can share your code if it is convenient for you.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG <be...@gmail.com> wrote:
>>
>>> Hi Macro,
>>> Do you specified drain flag when stop a job with a savepoint?
>>> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
>>> before the last checkpoint barrier.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Marco Villalobos <mv...@kineteque.com> 于2021年9月24日周五 下午12:54写道:
>>>
>>>> Something strange happened today.
>>>> When we tried to shutdown a job with a savepoint, the watermarks became
>>>> equal to 2^63 - 1.
>>>>
>>>> This caused timers to fire indefinitely and crash downstream systems
>>>> with overloaded untrue data.
>>>>
>>>> We are using event time processing with Kafka as our source.
>>>>
>>>> It seems impossible for a watermark to be that large.
>>>>
>>>> I know its possible stream with a batch execution mode.  But this was
>>>> stream processing.
>>>>
>>>> What can cause this?  Is this normal behavior when creating a savepoint?
>>>>
>>>

Re: stream processing savepoints and watermarks question

Posted by JING ZHANG <be...@gmail.com>.
Hi Guowei,
I could provide a case that I have encountered which timers to fire
indefinitely when doing drain savepoint.
After an event timer is triggered, it registers another event timer
whose value equals the value of triggered timer plus an interval time.
If a MAX_WATERMARK comes, the timer is triggered, then registers another
timer and forever.
I'm not sure whether Macro meets a similar problem.

Best,
JING ZHANG



Guowei Ma <gu...@gmail.com> 于2021年9月24日周五 下午4:01写道:

> Hi Macro
>
> Indeed, as mentioned by JING, if you want to drain when triggering
> savepoint, you will encounter this MAX_WATERMARK.
> But I have a problem. In theory, even with MAX_WATERMARK, there will not
> be an infinite number of timers. And these timers should be generated by
> the application code.
> You can share your code if it is convenient for you.
>
> Best,
> Guowei
>
>
> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG <be...@gmail.com> wrote:
>
>> Hi Macro,
>> Do you specified drain flag when stop a job with a savepoint?
>> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
>> before the last checkpoint barrier.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>>
>> Best,
>> JING ZHANG
>>
>> Marco Villalobos <mv...@kineteque.com> 于2021年9月24日周五 下午12:54写道:
>>
>>> Something strange happened today.
>>> When we tried to shutdown a job with a savepoint, the watermarks became
>>> equal to 2^63 - 1.
>>>
>>> This caused timers to fire indefinitely and crash downstream systems
>>> with overloaded untrue data.
>>>
>>> We are using event time processing with Kafka as our source.
>>>
>>> It seems impossible for a watermark to be that large.
>>>
>>> I know its possible stream with a batch execution mode.  But this was
>>> stream processing.
>>>
>>> What can cause this?  Is this normal behavior when creating a savepoint?
>>>
>>

Re: stream processing savepoints and watermarks question

Posted by Guowei Ma <gu...@gmail.com>.
Hi Macro

Indeed, as mentioned by JING, if you want to drain when triggering
savepoint, you will encounter this MAX_WATERMARK.
But I have a problem. In theory, even with MAX_WATERMARK, there will not be
an infinite number of timers. And these timers should be generated by the
application code.
You can share your code if it is convenient for you.

Best,
Guowei


On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG <be...@gmail.com> wrote:

> Hi Macro,
> Do you specified drain flag when stop a job with a savepoint?
> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
> before the last checkpoint barrier.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>
> Best,
> JING ZHANG
>
> Marco Villalobos <mv...@kineteque.com> 于2021年9月24日周五 下午12:54写道:
>
>> Something strange happened today.
>> When we tried to shutdown a job with a savepoint, the watermarks became
>> equal to 2^63 - 1.
>>
>> This caused timers to fire indefinitely and crash downstream systems with
>> overloaded untrue data.
>>
>> We are using event time processing with Kafka as our source.
>>
>> It seems impossible for a watermark to be that large.
>>
>> I know its possible stream with a batch execution mode.  But this was
>> stream processing.
>>
>> What can cause this?  Is this normal behavior when creating a savepoint?
>>
>

Re: stream processing savepoints and watermarks question

Posted by JING ZHANG <be...@gmail.com>.
Hi Macro,
Do you specified drain flag when stop a job with a savepoint?
If the --drain flag is specified, then a MAX_WATERMARK will be emitted
before the last checkpoint barrier.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint

Best,
JING ZHANG

Marco Villalobos <mv...@kineteque.com> 于2021年9月24日周五 下午12:54写道:

> Something strange happened today.
> When we tried to shutdown a job with a savepoint, the watermarks became
> equal to 2^63 - 1.
>
> This caused timers to fire indefinitely and crash downstream systems with
> overloaded untrue data.
>
> We are using event time processing with Kafka as our source.
>
> It seems impossible for a watermark to be that large.
>
> I know its possible stream with a batch execution mode.  But this was
> stream processing.
>
> What can cause this?  Is this normal behavior when creating a savepoint?
>