You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by bat man <ti...@gmail.com> on 2021/03/04 14:43:07 UTC

Watermark doesn't progress after job restore from savepoint

Hi All,

I have a job where my source is kafka. Stream1 is partition the data on
dynamic key, join the data with static rules(source kafka).I use
KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka). All
works fine in a normal run.

For changing the watermark generation interval I stop the job taking a
savepoint. When I restart the job with the savepoint the watermark is stuck
at - -9223372036854775808.
Because of this the process function doesn't emit any results.

What could be the problem?

Thanks,
Hemant

Re: Watermark doesn't progress after job restore from savepoint

Posted by Guowei Ma <gu...@gmail.com>.
Hi,
I think you could try implementing the `CheckpointedFunction` interface and
`FunctionInitializationContext.isRestored` is an indicator for that.

BTW: I am not very sure your scenarios but maybe you could try to set
idleness configurations [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
Best,
Guowei


On Fri, Mar 5, 2021 at 2:19 AM bat man <ti...@gmail.com> wrote:

> Thanks Piotr. Got it. Had to push the static rules to the kafka queue as
> it had expired and got archived from the topic. Post this the pipeline
> resumed.
> To your suggestion on implementing an operator that remembers the
> watermark, is there any indicator that the job has been resumed which I can
> use to emit the watermark in case the job has been resumed from savepoint.
>
> On Thu, Mar 4, 2021 at 8:46 PM Piotr Nowojski <pn...@apache.org>
> wrote:
>
>> Hi Hemant,
>>
>> State of the latest seen watermarks is not persisted in the operators.
>> Currently DataStream API assumes that after recovery watermarks are going
>> to be re-emitted sooner or later. What probably happens is that one of your
>> sources has emitted watermarks (maybe some very high one or even
>> `MAX_WATERMARK`) before taking a savepoint, and then it stopped emitting
>> them. As long as the job is not restarted, this watermark is kept in
>> memory. However after recovery, all watermarks in the operators are set to
>> MIN_WATERMARK (-9223372036854775808), and in your case, probably one of the
>> inputs `KeyedCoProcessFunction` watermark is never updated after the
>> recovery (for multiple input operators/functions combined watermark is min
>> from all of the inputs).
>>
>> You would need to make sure in one way or another that the watermarks are
>> being emitted after the recovery. As a last resort, you could probably
>> implement an operator that remembers the last checkpointed watermark on
>> its state, and re-emits it upon recovery.
>>
>> Best,
>> Piotrek
>>
>> czw., 4 mar 2021 o 15:43 bat man <ti...@gmail.com> napisał(a):
>>
>>> Hi All,
>>>
>>> I have a job where my source is kafka. Stream1 is partition the data on
>>> dynamic key, join the data with static rules(source kafka).I use
>>> KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka).
>>> All works fine in a normal run.
>>>
>>> For changing the watermark generation interval I stop the job taking a
>>> savepoint. When I restart the job with the savepoint the watermark is stuck
>>> at - -9223372036854775808.
>>> Because of this the process function doesn't emit any results.
>>>
>>> What could be the problem?
>>>
>>> Thanks,
>>> Hemant
>>>
>>

Re: Watermark doesn't progress after job restore from savepoint

Posted by bat man <ti...@gmail.com>.
Thanks Piotr. Got it. Had to push the static rules to the kafka queue as it
had expired and got archived from the topic. Post this the pipeline
resumed.
To your suggestion on implementing an operator that remembers the
watermark, is there any indicator that the job has been resumed which I can
use to emit the watermark in case the job has been resumed from savepoint.

On Thu, Mar 4, 2021 at 8:46 PM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Hemant,
>
> State of the latest seen watermarks is not persisted in the operators.
> Currently DataStream API assumes that after recovery watermarks are going
> to be re-emitted sooner or later. What probably happens is that one of your
> sources has emitted watermarks (maybe some very high one or even
> `MAX_WATERMARK`) before taking a savepoint, and then it stopped emitting
> them. As long as the job is not restarted, this watermark is kept in
> memory. However after recovery, all watermarks in the operators are set to
> MIN_WATERMARK (-9223372036854775808), and in your case, probably one of the
> inputs `KeyedCoProcessFunction` watermark is never updated after the
> recovery (for multiple input operators/functions combined watermark is min
> from all of the inputs).
>
> You would need to make sure in one way or another that the watermarks are
> being emitted after the recovery. As a last resort, you could probably
> implement an operator that remembers the last checkpointed watermark on
> its state, and re-emits it upon recovery.
>
> Best,
> Piotrek
>
> czw., 4 mar 2021 o 15:43 bat man <ti...@gmail.com> napisał(a):
>
>> Hi All,
>>
>> I have a job where my source is kafka. Stream1 is partition the data on
>> dynamic key, join the data with static rules(source kafka).I use
>> KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka).
>> All works fine in a normal run.
>>
>> For changing the watermark generation interval I stop the job taking a
>> savepoint. When I restart the job with the savepoint the watermark is stuck
>> at - -9223372036854775808.
>> Because of this the process function doesn't emit any results.
>>
>> What could be the problem?
>>
>> Thanks,
>> Hemant
>>
>

Re: Watermark doesn't progress after job restore from savepoint

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Hemant,

State of the latest seen watermarks is not persisted in the operators.
Currently DataStream API assumes that after recovery watermarks are going
to be re-emitted sooner or later. What probably happens is that one of your
sources has emitted watermarks (maybe some very high one or even
`MAX_WATERMARK`) before taking a savepoint, and then it stopped emitting
them. As long as the job is not restarted, this watermark is kept in
memory. However after recovery, all watermarks in the operators are set to
MIN_WATERMARK (-9223372036854775808), and in your case, probably one of the
inputs `KeyedCoProcessFunction` watermark is never updated after the
recovery (for multiple input operators/functions combined watermark is min
from all of the inputs).

You would need to make sure in one way or another that the watermarks are
being emitted after the recovery. As a last resort, you could probably
implement an operator that remembers the last checkpointed watermark on
its state, and re-emits it upon recovery.

Best,
Piotrek

czw., 4 mar 2021 o 15:43 bat man <ti...@gmail.com> napisał(a):

> Hi All,
>
> I have a job where my source is kafka. Stream1 is partition the data on
> dynamic key, join the data with static rules(source kafka).I use
> KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka). All
> works fine in a normal run.
>
> For changing the watermark generation interval I stop the job taking a
> savepoint. When I restart the job with the savepoint the watermark is stuck
> at - -9223372036854775808.
> Because of this the process function doesn't emit any results.
>
> What could be the problem?
>
> Thanks,
> Hemant
>