You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/05/28 15:54:00 UTC

[jira] [Commented] (KAFKA-9986) Checkpointing API for State Stores

    [ https://issues.apache.org/jira/browse/KAFKA-9986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118850#comment-17118850 ] 

John Roesler commented on KAFKA-9986:
-------------------------------------

Hi [~nizhikov] ,

Thanks for getting involved in this! It's something I've been wanting to see progress on for at least a year, so I'm really happy to see that you're interested in it, too. I know multiple people who would dearly love to see this feature in Streams.

One high-level request: can we call these things "*Snapshots*", not "Checkpoints" to avoid confusion with respect to the current "checkpoint" concept in Streams?

I'm not sure if you have already done it, but I'd highly recommend that you look at two sources of related work:
 # Other stateful data processing systems. Bulk processing systems may not be that relevant, and Streams has some unique features that may render other stream processing systems also not that relevant, but it's still worth taking the time to understand how systems that _do_ allow remote snapshotting actually manage those snapshots.
 # Other distributed databases. This may seem like a strange statement, but if you squint at it, you'll see that state stores in Streams _are_ distributed databases, following the primary/replica pattern, which use the changelog both for replication and for durability. This means that understanding the best snapshot/restore mechanisms from the distributed database world will be deeply helpful in developing a good design for Streams.

Incidentally, we did similar Related Work research while designing KIP-441 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams),] for very similar reasons. You might want to start by looking at the systems on our list.

 

One thing to bear in mind while you look at related systems is that Streams has one feature that renders it unique in both related domains. Namely, the changelog itself. The changelog topic represents a linearized history of the state store, with the offsets representing unique points in the history of the store. The changelog is stored durably external to Streams, and it's compacted with infinite retention.

This has a few important implications:
 * We do not need snapshot/restore to recover from disasters. Typically, this mechanism is used in data systems to recover from the loss of too many nodes in the cluster. However, Streams is already resilient to the loss of the entire cluster. For us, snapshot/restore is purely an optimization: copying filesystem objects to and from remote blob storage is likely to be faster than replaying the changelog for very large stores.
 * We do not need any special algorithms to keep coherent snapshots. As long as we store the snapshot along with the "checkpoint" information (the changelog topic, partition, and offset), we can replay any subsequent state updates from the changelog and return the system to _exactly_ where it left off processing its inputs. Other systems need to implement some form of the Chandy/Lamport Distributed Snapshot algorithm in order to capture coherent snapshots, but we get it essentially "for free".

 

Another important differentiating factor specifically as you look at distributed databases for comparison is that Streams is more like a distributed database _engine_ than "just" a distributed database itself. Most databases just have one storage format. For example, Cassandra stores its data in SSTables, Elasticsearch uses Lucene indices, etc. These systems can craft their snapshot/restore mechanism in full knowledge of the storage format.

On the other hand, Streams allows you to plug in multiple, custom, storage formats. For an efficient snapshot/restore, my impression is that you really need to deal with the low level format. For example, if we just iterate over a whole RocksDB store to copy it into a flat file for every snapshot, it's going to be _way_ slower and more bloated than if we just directly copy around SST files, and only copy the SST files that changed from previous snapshots.

It seems like we would need to design the system with two components, then. One is a way to keep track of the metadata: which snapshots are stored where, what offset they're at, etc. The other is actually performing the snapshot (and recovery), which would have a separate implementation for each store type. So, we'd have one for in-memory stores and another for rocksdb stores, and if people provide custom stores, they should also be able to implement the snapshotting logic for them.

 

Needless to say, the whole scope of this is quite large, and I think a good approach would be to get a general idea of how we want to structure the whole system, and then just implement a part of it. For example, (if you agree with the two part system I described in the last paragraph) maybe the initial implementation of the snapshot metadata component would only handle local filesystem locations, and maybe we would only implement snapshot/restore for in-memory stores. Maybe we wouldn't even bother with incremental snapshots for the first version of in-memory store snapshotting. As we expand the implementation to cover more of the full scope, we might discover that the design needs to be modified, which is perfectly fine.

 

I hope this helps!

-John

> Checkpointing API for State Stores
> ----------------------------------
>
>                 Key: KAFKA-9986
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9986
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Nikolay Izhikov
>            Priority: Major
>              Labels: need-kip, streams
>
> The parent ticket is KAFKA-3184.
> The goal of this ticket is to provide a general checkpointing API for state stores in Streams (not only for in-memory but also for persistent stores), where the checkpoint location can be either local disks or remote storage. 
> Design scope is primarily on:
>   # the API design for both checkpointing as well as loading checkpoints into the local state stores
>   # the mechanism of the checkpointing, e.g. whether it should be async? whether it should be executed on separate threads? etc. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)