You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jelmer <jk...@gmail.com> on 2018/01/18 23:26:42 UTC

Starting a job that does not use checkpointing from a savepoint is broken ?

I ran into a rather annoying issue today while upgrading a  flink jobs from
flink 1.3.2 to 1.4.0

This particular job does not use checkpointing not state.

I followed the instructions at
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html

First created a savepoint, upgraded the cluster, then restarted the job
from the savepoint.

This all went well until later a few hours later one of our kafka nodes
dies.This triggered an exception in the job which was subsequently
restarted.

However instead of picking up where it left off based on the offsets
comitted to kafka (which is what should happen according to
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html)
the kafka offsets where reset to the point when i made the savepoint 3
hours earlier and so it started reprocessing millions of messages.

Needless to say that creating a savepoint for a job without state or
checkpoints does not make that much sense. But I would not expect a restart
from a savepoint to completely break a job in the case of failure.

I created a repository that reproduces the scenario I encountered

https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing

Am I misunderstanding anything or should i file a bug for this ?

Re: Starting a job that does not use checkpointing from a savepoint is broken ?

Posted by jelmer <jk...@gmail.com>.
jelmer <jk...@gmail.com>
07:49 (0 minutes ago)
to Eron
Hey Eron,

Thanks, you stated the issue better and more compact than I could

I will not debate the wisdom of not using checkpoints but when migrating
jobs you may not  be aware if a job has checkpointing enabled, if you are
not the author, and if you follow the upgrade guide to the letter you end
up seriously breaking this job.

Somewhere something is wrong, be it in the documentation or implementation



On 19 January 2018 at 02:05, Eron Wright <er...@gmail.com> wrote:

> To restate the issue:
> When checkpointing is disabled, the Flink Kafka Consumer relies on the
> periodic offsets that are committed to the broker by the internal Kafka
> client.  Such a job would, upon restart, continue from the committed
> offsets.   However, in the situation that the job is restored from a
> savepoint, then the offsets within the savepoint supercede the broker-based
> offsets.
>
> It seems a bit unusual to use the savepoint feature on a job that doesn't
> have checkpointing enabled.  Makes me wonder whether
> `StreamExecutionEnvironment::enableCheckpointing`, is best understood as
> enabling +periodic+ checkpointing.
>
> The docs say that the periodic offset commit feature is not intended for
> fault tolerance, implying to me that you should use Flink's checkpointing
> feature.  A great reason to use Flink checkpointing is to capture the
> intermediate state of the job, such as window state, in addition to the
> consumer offsets.
>
> I hope this helps,
> Eron
>
>
>
>
>
> On Thu, Jan 18, 2018 at 3:26 PM, jelmer <jk...@gmail.com> wrote:
>
>> I ran into a rather annoying issue today while upgrading a  flink jobs
>> from flink 1.3.2 to 1.4.0
>>
>> This particular job does not use checkpointing not state.
>>
>> I followed the instructions at https://ci.apache.org/projects
>> /flink/flink-docs-release-1.4/ops/upgrading.html
>>
>> First created a savepoint, upgraded the cluster, then restarted the job
>> from the savepoint.
>>
>> This all went well until later a few hours later one of our kafka nodes
>> dies.This triggered an exception in the job which was subsequently
>> restarted.
>>
>> However instead of picking up where it left off based on the offsets
>> comitted to kafka (which is what should happen according to
>> https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/connectors/kafka.html)  the kafka offsets where reset to the
>> point when i made the savepoint 3 hours earlier and so it started
>> reprocessing millions of messages.
>>
>> Needless to say that creating a savepoint for a job without state or
>> checkpoints does not make that much sense. But I would not expect a restart
>> from a savepoint to completely break a job in the case of failure.
>>
>> I created a repository that reproduces the scenario I encountered
>>
>> https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing
>>
>> Am I misunderstanding anything or should i file a bug for this ?
>>
>>
>>
>

Re: Starting a job that does not use checkpointing from a savepoint is broken ?

Posted by Eron Wright <er...@gmail.com>.
To restate the issue:
When checkpointing is disabled, the Flink Kafka Consumer relies on the
periodic offsets that are committed to the broker by the internal Kafka
client.  Such a job would, upon restart, continue from the committed
offsets.   However, in the situation that the job is restored from a
savepoint, then the offsets within the savepoint supercede the broker-based
offsets.

It seems a bit unusual to use the savepoint feature on a job that doesn't
have checkpointing enabled.  Makes me wonder whether
`StreamExecutionEnvironment::enableCheckpointing`, is best understood as
enabling +periodic+ checkpointing.

The docs say that the periodic offset commit feature is not intended for
fault tolerance, implying to me that you should use Flink's checkpointing
feature.  A great reason to use Flink checkpointing is to capture the
intermediate state of the job, such as window state, in addition to the
consumer offsets.

I hope this helps,
Eron





On Thu, Jan 18, 2018 at 3:26 PM, jelmer <jk...@gmail.com> wrote:

> I ran into a rather annoying issue today while upgrading a  flink jobs
> from flink 1.3.2 to 1.4.0
>
> This particular job does not use checkpointing not state.
>
> I followed the instructions at https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/ops/upgrading.html
>
> First created a savepoint, upgraded the cluster, then restarted the job
> from the savepoint.
>
> This all went well until later a few hours later one of our kafka nodes
> dies.This triggered an exception in the job which was subsequently
> restarted.
>
> However instead of picking up where it left off based on the offsets
> comitted to kafka (which is what should happen according to
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html)  the kafka offsets where reset to
> the point when i made the savepoint 3 hours earlier and so it started
> reprocessing millions of messages.
>
> Needless to say that creating a savepoint for a job without state or
> checkpoints does not make that much sense. But I would not expect a restart
> from a savepoint to completely break a job in the case of failure.
>
> I created a repository that reproduces the scenario I encountered
>
> https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing
>
> Am I misunderstanding anything or should i file a bug for this ?
>
>
>