You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Kirill Tkalenko (Jira)" <ji...@apache.org> on 2023/01/27 14:12:00 UTC

[jira] [Assigned] (IGNITE-17990) Download RAFT snapshot without deleting original partition data

     [ https://issues.apache.org/jira/browse/IGNITE-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kirill Tkalenko reassigned IGNITE-17990:
----------------------------------------

    Assignee: Kirill Tkalenko

> Download RAFT snapshot without deleting original partition data
> ---------------------------------------------------------------
>
>                 Key: IGNITE-17990
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17990
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ivan Bessonov
>            Assignee: Kirill Tkalenko
>            Priority: Major
>              Labels: ignite-3
>             Fix For: 3.0.0-beta2
>
>
> h3. Note
> This is an umbrella issue. It's split into several smaller issues.
> {color:red}See the first comment, it indicates how we end up doing a full rebalancing.{color}
> h3. Description
> In first design, full rebalance is implemented this way:
>  * we drop partition data
>  * we download partition data from the leader
>  * we're done
> There's a problem with this approach - if download part failed, we lost one follower. This is bad, because technically new leader may have more data in the log and it could have uploaded it the follower, but now it makes no sense.
> Not only can it lead to hard-to-catch errors and introducing custom work-around code to JRaft, it's also an unconditional data deletion without neither explicit  user approval nor a copy of the data preserved durably.
> Such implementation is fine for POC and some tests, but it cannot be allowed in the release version of the product.
> h3. New proposed solution
> As trivial as it may seem, new solution is to _not deleting data_ before snapshot is fully downloaded and ready for swap. Why is it trivial? Because this is literally what RAFT demands to be done.
> Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, should be {{O(1)}} when it comes to the number of rows in the partition and a number of transactions in a tx state store. This may not be fully achievable, depending on the implementation that we chose, more on that later.
> Following sections will describe all my concerns and possible implementations. Some sections can be skipped while reading. For example, if you're not interested in a specific storage engine, but want to read everything else.
> h3. TX state storage
> There's one really good thing about TX state storage. It has no storage engine, there's only a single RocksDB-based implementation. This makes possible the following approach:
>  * when we stream data, we can write it into a SST file, almost like in snapshots of meta-storage and CMG storages
>  * once snapshot is downloaded, we ingest it into a storage
> What I like about this solution is that it's very simple. But, there are concerns:
>  * ingesting leads to additional implicit data flush. Maybe it can be avoided, more on that later
>  * it's not clear whether RocksDB creates a copy of SST file or not. I would assume that it does, because the file might be in other folder or on another device, for example. Although copying files is fast, it still takes time. Add to this a time required for the flush and we see a problem - operation may become unnecessarily long
> For these reasons, I don't think that such solution should be implemented. The point of this description was to show, that I thought about this alternative and consciously decided to use another one.
> I believe that TX state storage should use the same approach as a RocksDB-based partition storage. Its description can be found later in this issue.
> h3. MV storage - Test engine
> Test uses concurrent skip-list map for MV data and a bunch of other maps for indexes. While snapshots is being downloaded, we should insert all data into new maps, that have the same structure. In the end, we should have two versions of the partition: old and new.
> {{onSnapshotLoad}} should just swap all objects. After that, old partition data can be cleaned by the garbage collector.
> This is a good place to start implementation. I assume that some new API will be introduced. I have thoughts about it as well, they are described later in *API* section.
> h3. MV storage - RocksDB engine
> SST-based approach is described in a *TX state storage* section. There I describe why I don't think that this is a good solution. Same reasoning can be applied here just as effectively. This means that we should write data in the same RocksDB instance. This is a little bit tricky.
> The reason is that all stored data is merged together, and Columns Families are shared between different partitions. This makes it harder to find a place to write partition data while old partition data persists. As a reminder and an example, let's take a look at how data is stored in row storage:
> {code:java}
> +-------------------+-----+-------------------+
> |    Partition 0    | ... |   Partition MAX   |
> +-------------------+-----+-------------------+
> | Row1 | ... | RowN | ... | Row1 | ... | RowN |
> +-------------------+-----+-------------------+{code}
> Logically, CF is split into a different "blocks", and each block represents a partition. Currently, each partition block is associated with an 2-bytes identifier that matches a partition number in Big Endian.
>  
> We could add new CF with similar structure and write snapshot data in it, but then the snapshot load operation would require us to move data from one CF to another. The only option that I know of, that can do this, is SST ingestion. And I already explained why I don't like it.
> This leaves us with the necessity to write data into the same column family. Naturally occurring solution is to assign a new identifier to the "new" version of partition. This way replacing "old" partition with "new" would be implemented by replacing "oldPartId" to "newPartId" in table storage metadata.
> Sounds good. No flush is required, snapshot loading becomes pretty fast.
> The only thing to keep in mind is that there are multiple column families in each partition - row data, hash indexes and a CF for every sorted index.
> When "old" partition is deleted, we should probably somehow hint that RocksDB should merge some layers and remove a substantial amount of data from disk. But such optimization also relates to general partition eviction and is out of scope of the discussion.
> Last point: what is "oldPartId" and "newPartId"?
> Logically, partition id now becomes a tuple of partition number and its generation. Physically it should be represented as an integer, where lower bits are generation and higher bits are partition number. It it 2 or 3 bytes? Good question, I'll answer it later.
> {code:java}
> | Partition Number | Generation |{code}
> Every time we need a rebalance, we increase the generation of the current partition. Generation counter overflows without affecting the partition number.
>  
> There are alternatives - we either have N possible generations (rotating, of course) or only 2 (0 and 1). Why is this important?
> Every time a partition is "started", it should, technically, perform a cleanup. Imagine (for example) we have partition {{{}(23,g){}}}. Then we would have to cleanup following ranges (lower bound inclusive, upper bound exclusive):
>  * for 256 generations - ranges {{(23,0):(23,g)}} and {{(23,g+1):(24,0)}}
>  * for 2 generations - range {{(23,1-g):(23,2-g)}}
> This should be done for all indexes as well. Something tells me that recovery will be somewhat faster in the second case, but is it more convenient? I don't know.
> Lastly, random thought on the prefix size. Don't we increase a footprint by storing it in every key? Yes and no. Right now the engine is not yet properly tuned, but in the future, we may set it up in such a way that RocksDB trims prefixes from the keys, so the prefix is kind of irrelevant. Will we configure a prefix to be a partition id or a pair (partId, rowId) - I don't know yet. Both options look good, but second may be better. We'll should do both and benchmark them.
> h3. MV storage - Persistent Page Memory
> This engine is way less complicated, but there are tricky parts as well. First of all, we can't have new partition ids like in RocksDB engine. It's 2 bytes, period. Too much code depends on this size to be exactly two bytes. There's a possibility of reducing a maximum number of partitions to a 32 thousands or so, leaving us with a single "free" bit to store generation, like in proposed RocksDB implementation. 
> Second, unlike RocksDB, each partition is stored in its own set of files. Or, in other words, partitions are completely independent, which greatly simplifies the swapping procedure. I propose the following algorithm:
>  * create new partition generation and upload data into it
>  * checkpoint everything
>  * invalidate all partition pages in page memory (this is {{{}O(1){}}}) for the old generation and close all file descriptors (page-stores)
>  * drop new partition files
> Local storage recovery should be implemented carefully. I propose having a rebalance marker, that shows that rebalance is in progress. On recovery:
>  * if marker is present, delete "new" partition files
>  * if marker is absent, do nothing
> Marker should contain the generation information, otherwise there might be a confusion. Real generation must be easy to determine.
> h3. MV storage - Volatile Page Memory
> In case of a volatile storage, there are no files to work with and there's even no need to partition generations. Instead, we can preserve old partition "meta" - pointers to all trees, free lists or anything else useful. After that, we start writing data into a new partition with a new meta. Pretty much like in Test storage engine, but offheap.
> When we're done, we can start deleting old partition data, using the same mechanism that's used in partition eviction (IGNITE-17833, not implemented yet).
> If snapshot downloading is interrupted, we delete everything that we already downloaded asynchronously, again, reusing the partition eviction code. No memory leaks should be left from it.
> h3. Atomic snapshot load in case of multiple storages
> On every step, every operation may fail. But data consistency should be preserved no matter what. Here, in particular, we need to call two {{onSnapshotLoad}} methods atomically. Their implementations may be very different.
> On high level, operation may look like this:
>  * write operation marker somewhere (table folder, vault, doesn't matter). Before doing so, we need to make sure that data is persisted on disk for both storages. Once marker is created, there's no way back. Old partition data will be destroyed
>  * call both methods
>  * remove marker when load is completed
> Pretty simple. Just do the same thing on recovery, if marker is present. One thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to work. If new partition is already loaded, nothing should break. Loading should effectively become a no-op in such case.
> h3. API
> I realize that MvPartitionStorage interface slowly becomes a mess. There's not much that we can do with it. But, rebalance is a good exception to the rule.
> Basically, we can implement something like this:
> {code:java}
> void performLocalRecovery();
> MvRebalanceDataWriter startDataRebalancing();
> interface MvRebalanceDataWriter {
>     CompletableFuture<?> beforeWritingData();
>     void addCommitted(...);
>     void addUncommitted(...); // Doesn't return the old value, because that's actually pointless during rebalancing
>     // lastAppliedIndex, runConsistently, etc.
>     CompletableFuture<?> afterWritingData();
>     void close();
> }{code}
> What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm not sure at the moment. I just hope that this brief description gives you a gist of what I would like to see in the code. I do believe that it will simplify the API at least a little bit.
> What about indexes? That's a good question. I would expect that old objects, created with {{{}getOrCreate*Index{}}}, should still be functional. It may be a pain in the rear, we may have to introduce default implementation with changeable delegates. It's hard to predict exactly, but this is definitely a part that also requires attention. Same applies to partitions, actually.
> h3. Read Access
> One of the last things to discuss is a possibility to read data while rebalance is in the process. As we know, "full rebalance" can theoretically be performed on a live follower. RAFT works around this fact by allowing reading data from leader only. We don't have such limitation. Instead we have problems.
> Technically, it's very easy to allow reads until the {{onSnapshotLoad}} happens. Everything's simple when it's completed also. Any read operation during onSnapshotLoad should wait for its completion.
> That's good for "fast" read operations. But we also have scans. There are two options:
>  * we cancel them
>  * we allow them continue working seamlessly
> I think the second option is better. When cursor is notified that the "switch" is happening, it opens a new underlying cursor from the position that it previously stopped on. It feels like each storage will have its own version, they may have small differences.
> h3. Conclusion
> I know that this is a pretty big task. I don't expect it to be done in one sitting. It should be split to 3 issues at least. Probably more.
> This is fine, just don't forget the link to this particular issue, because it has the overall description of what's going on.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)