You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Abhishek R. Singh" <ab...@tetrationanalytics.com> on 2016/05/19 18:54:37 UTC

flink async snapshots

If you can take atomic in-memory copies, then it works (at the cost of doubling your instantaneous memory). For larger state (say rocks DB), won’t you have to stop the world (atomic snapshot) and make a copy? Doesn’t that make it synchronous, instead of background/async?

Sorry Stravros - for bumping into your thread. This should probably have been a new thread (I changed the subject in an attempt to fix up).

-Abhishek-

> On May 19, 2016, at 11:42 AM, Paris Carbone <pa...@kth.se> wrote:
> 
> Hi Abhishek, 
> I don’t see the problem there (also this is unrelated to the snapshotting protocol). 
> Intuitively, if you submit a copy of your state (full or delta) for a snapshot version/epoch to a store backend and validate the full snapshot for that version when you eventually receive the acknowledgements this still works fine. Am I missing something?
> 
>> On 19 May 2016, at 20:36, Abhishek R. Singh <abhishsi@tetrationanalytics.com <ma...@tetrationanalytics.com>> wrote:
>> 
>> I was wondering how checkpoints can be async? Because your state is constantly mutating. You probably need versioned state, or immutable data structs?
>> 
>> -Abhishek-
>> 
>>> On May 19, 2016, at 11:14 AM, Paris Carbone <parisc@kth.se <ma...@kth.se>> wrote:
>>> 
>>> Hi Stavros,
>>> 
>>> Currently, rollback failure recovery in Flink works in the pipeline level, not in the task level (see Millwheel [1]). It further builds on repayable stream logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline sources. You can also check this presentation [2] which explains the basic concepts more in detail I hope. Mind that many upcoming optimisation opportunities are going to be addressed in the not so long-term Flink roadmap.
>>> 
>>> Paris
>>> 
>>> [1] http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf <http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf>
>>> [2] http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha
>>>  <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>>> 
>>>  <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>>> 
>>>  <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>>>> On 19 May 2016, at 19:43, Stavros Kontopoulos <st.kontopoulos@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Cool thnx. So if a checkpoint expires the pipeline will block or fail in total or only the specific task related to the operator (running along with the checkpoint task) or nothing happens?
>>>> 
>>>> On Tue, May 17, 2016 at 3:49 PM, Robert Metzger <rmetzger@apache.org <ma...@apache.org>> wrote:
>>>> Hi Stravos,
>>>> 
>>>> I haven't implemented our checkpointing mechanism and I didn't participate in the design decisions while implementing it, so I can not compare it in detail to other approaches.
>>>> 
>>>> From a "does it work perspective": Checkpoints are only confirmed if all parallel subtasks successfully created a valid snapshot of the state. So if there is a failure in the checkpointing mechanism, no valid checkpoint will be created. The system will recover from the last valid checkpoint.
>>>> There is a timeout for checkpoints. So if a barrier doesn't pass through the system for a certain period of time, the checkpoint is cancelled. The default timeout is 10 minutes.
>>>> 
>>>> Regards,
>>>> Robert
>>>> 
>>>> 
>>>> On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos <st.kontopoulos@gmail.com <ma...@gmail.com>> wrote:
>>>> Hi,
>>>> 
>>>> I was looking into the flink snapshotting algorithm details also mentioned here:
>>>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/ <http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/>
>>>> https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/ <https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/>
>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E <http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E>
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html>
>>>> 
>>>> From other sources i understand that it assumes no failures to work for message delivery or for example a process hanging for ever:
>>>> https://en.wikipedia.org/wiki/Snapshot_algorithm <https://en.wikipedia.org/wiki/Snapshot_algorithm>
>>>> https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/ <https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/>
>>>> 
>>>> So my understanding (maybe wrong) is that this is a solution which seems not to address the fault tolerance issue in a strong manner like for example if it was to use a 3pc protocol for local state propagation and global agreement. I know the latter is not efficient just mentioning it for comparison. 
>>>> 
>>>> How the algorithm behaves in practical terms under the presence of its own failures (this is a background process collecting partial states)? Are there timeouts for reaching a barrier?
>>>> 
>>>> PS. have not looked deep into the code details yet, planning to.
>>>> 
>>>> Best,
>>>> Stavros
>>>> 
>>>> 
>>>> 
>>> 
>> 
> 


Re: flink async snapshots

Posted by Stavros Kontopoulos <st...@gmail.com>.
No  problem ;)

On Thu, May 19, 2016 at 9:54 PM, Abhishek R. Singh <
abhishsi@tetrationanalytics.com> wrote:

> If you can take atomic in-memory copies, then it works (at the cost of
> doubling your instantaneous memory). For larger state (say rocks DB), won’t
> you have to stop the world (atomic snapshot) and make a copy? Doesn’t that
> make it synchronous, instead of background/async?
>
> Sorry Stravros - for bumping into your thread. This should probably have
> been a new thread (I changed the subject in an attempt to fix up).
>
> -Abhishek-
>
> On May 19, 2016, at 11:42 AM, Paris Carbone <pa...@kth.se> wrote:
>
> Hi Abhishek,
> I don’t see the problem there (also this is unrelated to the snapshotting
> protocol).
> Intuitively, if you submit a copy of your state (full or delta) for a
> snapshot version/epoch to a store backend and validate the full snapshot
> for that version when you eventually receive the acknowledgements this
> still works fine. Am I missing something?
>
> On 19 May 2016, at 20:36, Abhishek R. Singh <
> abhishsi@tetrationanalytics.com> wrote:
>
> I was wondering how checkpoints can be async? Because your state is
> constantly mutating. You probably need versioned state, or immutable data
> structs?
>
> -Abhishek-
>
> On May 19, 2016, at 11:14 AM, Paris Carbone <pa...@kth.se> wrote:
>
> Hi Stavros,
>
> Currently, rollback failure recovery in Flink works in the pipeline level,
> not in the task level (see Millwheel [1]). It further builds on repayable
> stream logs (i.e. Kafka), thus, there is no need for 3pc or backup in the
> pipeline sources. You can also check this presentation [2] which explains
> the basic concepts more in detail I hope. Mind that many upcoming
> optimisation opportunities are going to be addressed in the not so
> long-term Flink roadmap.
>
> Paris
>
> [1]
> http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
> [2]
> http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha
>
>
> <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>
>
> <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>
> On 19 May 2016, at 19:43, Stavros Kontopoulos <st...@gmail.com>
> wrote:
>
> Cool thnx. So if a checkpoint expires the pipeline will block or fail in
> total or only the specific task related to the operator (running along with
> the checkpoint task) or nothing happens?
>
> On Tue, May 17, 2016 at 3:49 PM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Stravos,
>>
>> I haven't implemented our checkpointing mechanism and I didn't
>> participate in the design decisions while implementing it, so I can not
>> compare it in detail to other approaches.
>>
>> From a "does it work perspective": Checkpoints are only confirmed if all
>> parallel subtasks successfully created a valid snapshot of the state. So if
>> there is a failure in the checkpointing mechanism, no valid checkpoint will
>> be created. The system will recover from the last valid checkpoint.
>> There is a timeout for checkpoints. So if a barrier doesn't pass through
>> the system for a certain period of time, the checkpoint is cancelled. The
>> default timeout is 10 minutes.
>>
>> Regards,
>> Robert
>>
>>
>> On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos <
>> st.kontopoulos@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I was looking into the flink snapshotting algorithm details also
>>> mentioned here:
>>>
>>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>>>
>>> https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
>>>
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html
>>>
>>> From other sources i understand that it assumes no failures to work for
>>> message delivery or for example a process hanging for ever:
>>> https://en.wikipedia.org/wiki/Snapshot_algorithm
>>>
>>> https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/
>>>
>>> So my understanding (maybe wrong) is that this is a solution which seems
>>> not to address the fault tolerance issue in a strong manner like for
>>> example if it was to use a 3pc protocol for local state propagation and
>>> global agreement. I know the latter is not efficient just mentioning it for
>>> comparison.
>>>
>>> How the algorithm behaves in practical terms under the presence of its
>>> own failures (this is a background process collecting partial states)? Are
>>> there timeouts for reaching a barrier?
>>>
>>> PS. have not looked deep into the code details yet, planning to.
>>>
>>> Best,
>>> Stavros
>>>
>>>
>>
>
>
>
>
>

Re: flink async snapshots

Posted by Aljoscha Krettek <al...@apache.org>.
Thats correct. With the fully async option the checkpoints take longer but
you don't impact ongoing processing of elements. With the semi-async method
snapshots take a shorter time but during the synchronous part no element
processing can happen.

On Fri, 20 May 2016 at 15:04 Abhishek Singh <ab...@tetrationanalytics.com>
wrote:

> Yes. Thanks for explaining.
>
> On Friday, May 20, 2016, Ufuk Celebi <uc...@apache.org> wrote:
>
>> On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh
>> <ab...@tetrationanalytics.com> wrote:
>> > If you can take atomic in-memory copies, then it works (at the cost of
>> > doubling your instantaneous memory). For larger state (say rocks DB),
>> won’t
>> > you have to stop the world (atomic snapshot) and make a copy? Doesn’t
>> that
>> > make it synchronous, instead of background/async?
>>
>> Hey Abhishek,
>>
>> that's correct. There are two variants for RocksDB:
>>
>> - semi-async (default): snapshot is taking via RocksDB backup feature
>> to backup to a directory (sync). This is then copied to the final
>> checkpoint location (async, e.g copy to HDFS).
>>
>> - fully-async: snapshot is taking via RocksDB snapshot feature (sync,
>> but no full copy and essentially "free"). With this snapshot we
>> iterate over all k/v-pairs and copy them to the final checkpoint
>> location (async, e.g. copy to HDFS).
>>
>> You enable the second variant via:
>> rocksDbBackend.enableFullyAsyncSnapshots();
>>
>> This is only part of the 1.1-SNAPSHOT version though.
>>
>> I'm not too familiar with the performance characteristics of both
>> variants, but maybe Aljoscha can chime in.
>>
>> Does this clarify things for you?
>>
>> – Ufuk
>>
>

Re: flink async snapshots

Posted by Abhishek Singh <ab...@tetrationanalytics.com>.
Yes. Thanks for explaining.

On Friday, May 20, 2016, Ufuk Celebi <uc...@apache.org> wrote:

> On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh
> <abhishsi@tetrationanalytics.com <javascript:;>> wrote:
> > If you can take atomic in-memory copies, then it works (at the cost of
> > doubling your instantaneous memory). For larger state (say rocks DB),
> won’t
> > you have to stop the world (atomic snapshot) and make a copy? Doesn’t
> that
> > make it synchronous, instead of background/async?
>
> Hey Abhishek,
>
> that's correct. There are two variants for RocksDB:
>
> - semi-async (default): snapshot is taking via RocksDB backup feature
> to backup to a directory (sync). This is then copied to the final
> checkpoint location (async, e.g copy to HDFS).
>
> - fully-async: snapshot is taking via RocksDB snapshot feature (sync,
> but no full copy and essentially "free"). With this snapshot we
> iterate over all k/v-pairs and copy them to the final checkpoint
> location (async, e.g. copy to HDFS).
>
> You enable the second variant via:
> rocksDbBackend.enableFullyAsyncSnapshots();
>
> This is only part of the 1.1-SNAPSHOT version though.
>
> I'm not too familiar with the performance characteristics of both
> variants, but maybe Aljoscha can chime in.
>
> Does this clarify things for you?
>
> – Ufuk
>

Re: flink async snapshots

Posted by Ufuk Celebi <uc...@apache.org>.
On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh
<ab...@tetrationanalytics.com> wrote:
> If you can take atomic in-memory copies, then it works (at the cost of
> doubling your instantaneous memory). For larger state (say rocks DB), won’t
> you have to stop the world (atomic snapshot) and make a copy? Doesn’t that
> make it synchronous, instead of background/async?

Hey Abhishek,

that's correct. There are two variants for RocksDB:

- semi-async (default): snapshot is taking via RocksDB backup feature
to backup to a directory (sync). This is then copied to the final
checkpoint location (async, e.g copy to HDFS).

- fully-async: snapshot is taking via RocksDB snapshot feature (sync,
but no full copy and essentially "free"). With this snapshot we
iterate over all k/v-pairs and copy them to the final checkpoint
location (async, e.g. copy to HDFS).

You enable the second variant via: rocksDbBackend.enableFullyAsyncSnapshots();

This is only part of the 1.1-SNAPSHOT version though.

I'm not too familiar with the performance characteristics of both
variants, but maybe Aljoscha can chime in.

Does this clarify things for you?

– Ufuk