You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Federico D'Ambrosio <fe...@smartlab.ws> on 2017/09/28 09:46:11 UTC

Question about checkpointing with stateful operators and state recovery

Hi, I've got a couple of questions concerning the topics in the subject:

    1. If an operator is getting applied on a keyed stream, do I still have
to implement the CheckpointedFunction trait and define the snapshotState
and initializeState methods, in order to successfully recover the state
from a job failure?

    2. While using a FlinkKafkaConsumer, enabling checkpointing allows
exactly once semantics end to end, provided that the sink is able to
guarantee the same. Do I have to set
setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly
once semantics in a sink?

    3. What are the advantages of externalized checkpoints and which are
the cases where I would want to use them?

    4. Let's suppose a scenario where: checkpointing is enabled every 10
seconds, I have a kafka consumer which is set to start from the latest
records, a sink providing at least once semantics and a stateful keyed
operator inbetween the consumer and the sink. Is it correct that, in case
of task failure, happens the following?
        - the kafka consumer gets reverted to the latest offset (does it
happen even if I don't set setCommitOffsetsOnCheckpoints(true)?)
        - the operator state gets reverted to the latest checkpoint
        - the sink is stateless so it doesn't really care about what
happened
        - the stream restarts and probably some of the events coming to the
sink have already been     processed before

Thank you for attention,
Kind regards,
Federico

Re: Question about checkpointing with stateful operators and state recovery

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Frederico,

I'll try and give some answers:

1. Generally speaking, no. If you use keyed state, for example via RuntimeContext you don't need to implement CheckpointedFunction.

2. You don't have to set setCommitOffsetsOnCheckpoints(true), this only affects how offsets are committed to Kafka in case other systems want to check that offset. To get exactly once semantics you have two general paths: 1) your sink is idempotent, meaning it doesn't matter whether you write output multiple times 2) the sink has to be integrated with Flink checkpointing and transactions. 2) was not easily possible for Kafka until Kafka 0.11 introduced transaction support. Flink 1.4 will have a Kafka 0.11 producer that supports transactions so with that you can have end-to-end exactly once.

3. The advantage of externalised checkpoints is that they don't get deleted when you cancel a job. This is different from regular checkpoints, which get deleted when you manually cancel a job. There are plans to make all checkpoints "externalised" in Flink 1.4.

4. Yes, you are correct. :-)

Best,
Aljoscha

> On 28. Sep 2017, at 11:46, Federico D'Ambrosio <fe...@smartlab.ws> wrote:
> 
> Hi, I've got a couple of questions concerning the topics in the subject:
> 
>     1. If an operator is getting applied on a keyed stream, do I still have to implement the CheckpointedFunction trait and define the snapshotState and initializeState methods, in order to successfully recover the state from a job failure?
>     
>     2. While using a FlinkKafkaConsumer, enabling checkpointing allows exactly once semantics end to end, provided that the sink is able to guarantee the same. Do I have to set
> setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly once semantics in a sink?
> 
>     3. What are the advantages of externalized checkpoints and which are the cases where I would want to use them?
>   
>     4. Let's suppose a scenario where: checkpointing is enabled every 10 seconds, I have a kafka consumer which is set to start from the latest records, a sink providing at least once semantics and a stateful keyed operator inbetween the consumer and the sink. Is it correct that, in case of task failure, happens the following?
>         - the kafka consumer gets reverted to the latest offset (does it happen even if I don't set setCommitOffsetsOnCheckpoints(true)?)
>         - the operator state gets reverted to the latest checkpoint
>         - the sink is stateless so it doesn't really care about what happened
>         - the stream restarts and probably some of the events coming to the sink have already been     processed before
> 
> Thank you for attention,
> Kind regards,
> Federico