You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gyula Fora (JIRA)" <ji...@apache.org> on 2016/06/27 08:10:52 UTC

[jira] [Commented] (FLINK-4120) Lightweight fault tolerance through recomputing lost state

    [ https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350565#comment-15350565 ] 

Gyula Fora commented on FLINK-4120:
-----------------------------------

Hi,

Thanks for the interesting proposal. I have several questions that popped into my mind while trying to understand this:

How would you know what states can be "easily recomputed"? I think in many cases the state is computed over a long period of time (weeks/months) and the overhead of checkpointing is irrelevant compared to restreaming the records.

Let's assume you can recompute a state for some operators but others would restore from their checkpoints. As you described these would wait for the latest checkpoint barreier before continuing to process records, however the barriers are not deterministic so this would not work.

This also assumes a lot about the persistent storage, as in many cases Kafka for instance does not hold the data forever (or long enough for the null checkpoint).

Cheers,
Gyula

> Lightweight fault tolerance through recomputing lost state
> ----------------------------------------------------------
>
>                 Key: FLINK-4120
>                 URL: https://issues.apache.org/jira/browse/FLINK-4120
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Dénes Vadász
>
> The current fault tolerance mechanism requires that stateful operators write their internal state to stable storage during a checkpoint. 
> This proposal aims at optimizing out this operation in the cases where the operator state can be recomputed from a finite (practically: small) set of source records, and those records are already on checkpoint-aware persistent storage (e.g. in Kafka). 
> The rationale behind the proposal is that the cost of reprocessing is paid only on recovery from (presumably rare) failures, whereas the cost of persisting the state is paid on every checkpoint. Eliminating the need for persistent storage will also simplify system setup and operation.
> In the cases where this optimisation is applicable, the state of the operators can be restored by restarting the pipeline from a checkpoint taken before the pipeline ingested any of the records required for the state re-computation of the operators (call this the "null-state checkpoint"), as opposed to a restart from the "latest checkpoint". 
> The "latest checkpoint" is still relevant for the recovery: the barriers belonging to that checkpoint must be inserted into the source streams in the position they were originally inserted. Sinks must discard all records until this barrier reaches them.
> Note the inherent relationship between the "latest" and the "null-state" checkpoints: the pipeline must be restarted from the latter to restore the state at the former.
> For the stateful operators for which this optimization is applicable we can define the notion of "current null-state watermark" as the watermark such that the operator can correctly (re)compute its current state merely from records after this watermark. 
>  
> For the checkpoint-coordinator to be able to compute the null-state checkpoint, each stateful operator should report its "current null-state watermark" as part of acknowledging the ongoing checkpoint. The null-state checkpoint of the ongoing checkpoint is the most recent checkpoint preceding all the received null-state watermarks (assuming the pipeline preserves the relative order of barriers and watermarks).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)