You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dénes Vadász (JIRA)" <ji...@apache.org> on 2016/06/27 20:24:52 UTC

[jira] [Comment Edited] (FLINK-4120) Lightweight fault tolerance through recomputing lost state

    [ https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351739#comment-15351739 ] 

Dénes Vadász edited comment on FLINK-4120 at 6/27/16 8:23 PM:
--------------------------------------------------------------

Whether the state is re-computed or restored from stable storage should be a trade-off controlled by the designer of the pipeline. If the input records are persisted before the pipeline ingests them and how long they are preserved there is at the discretion of the designer of the pipeline, so he/she can align these decisions.
I am arguing in favour of making the re-computation of the state possible; not in favour of mandating it.

The use case I am coming with is correlation of log records to restore the message flow in a distributed computer system. In such a case the typical lifetime of operator state is a few minutes at most. (In fact we wanted to port a solution using this optimization (and written long before Spark and Flink were born) to Flink, and discovered that Flink currently lacks this.)

Note that if you want to mix recomputing and restoring operators in a pipeline, the restoring operators have to restore the state they saved during the null-state checkpoint and participate in the re-computation phase in the same way they behave during normal operation, unaffected by the latest checkpoint barrier. This barrier influences the behaviour of sinks only: before the barrier sinks consume and discard their input (as these records were already emitted before the failure); after the barrier they emit normally.

I do not quite understood the statement relating to barriers being non-deterministic; if in view of my explanations this is still relevant, can you please elaborate on it?

Regards
Dénes


was (Author: dvadasz):
Whether the state is re-computed or restored from stable storage should be a trade-off controlled by the designer of the pipeline. If the input records are persisted before the pipeline ingests them and how long they are preserved there should again be at the discretion of the designer of the pipeline.
I am arguing in favour of making the re-computation of the state possible; not in favour of mandating it.

The use case I am coming with is correlation of log records to restore the message flow in a distributed computer system. In such a case the typical lifetime of operator state is a few minutes at most. (In fact we wanted to port a solution using this optimization (and written long before Spark and Flink were born) to Flink, and discovered that Flink currently lacks this.)

Note that if you want to mix recomputing and restoring operators in a pipeline, the restoring operators have to restore the state they saved during the null-state checkpoint and participate in the re-computation phase in the same way they behave during normal operation, unaffected by the latest checkpoint barrier. This barrier influences the behaviour of sinks only: before the barrier sinks consume and discard their input (as these records were already emitted before the failure); after the barrier they emit normally.

I do not quite understood the statement relating to barriers being non-deterministic; if in view of my explanations this is still relevant, can you please elaborate on it?

Regards
Dénes

> Lightweight fault tolerance through recomputing lost state
> ----------------------------------------------------------
>
>                 Key: FLINK-4120
>                 URL: https://issues.apache.org/jira/browse/FLINK-4120
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Dénes Vadász
>            Priority: Minor
>
> The current fault tolerance mechanism requires that stateful operators write their internal state to stable storage during a checkpoint. 
> This proposal aims at optimizing out this operation in the cases where the operator state can be recomputed from a finite (practically: small) set of source records, and those records are already on checkpoint-aware persistent storage (e.g. in Kafka). 
> The rationale behind the proposal is that the cost of reprocessing is paid only on recovery from (presumably rare) failures, whereas the cost of persisting the state is paid on every checkpoint. Eliminating the need for persistent storage will also simplify system setup and operation.
> In the cases where this optimisation is applicable, the state of the operators can be restored by restarting the pipeline from a checkpoint taken before the pipeline ingested any of the records required for the state re-computation of the operators (call this the "null-state checkpoint"), as opposed to a restart from the "latest checkpoint". 
> The "latest checkpoint" is still relevant for the recovery: the barriers belonging to that checkpoint must be inserted into the source streams in the position they were originally inserted. Sinks must discard all records until this barrier reaches them.
> Note the inherent relationship between the "latest" and the "null-state" checkpoints: the pipeline must be restarted from the latter to restore the state at the former.
> For the stateful operators for which this optimization is applicable we can define the notion of "current null-state watermark" as the watermark such that the operator can correctly (re)compute its current state merely from records after this watermark. 
>  
> For the checkpoint-coordinator to be able to compute the null-state checkpoint, each stateful operator should report its "current null-state watermark" as part of acknowledging the ongoing checkpoint. The null-state checkpoint of the ongoing checkpoint is the most recent checkpoint preceding all the received null-state watermarks (assuming the pipeline preserves the relative order of barriers and watermarks).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)