You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Ivan Bessonov (Jira)" <ji...@apache.org> on 2022/10/06 07:36:00 UTC

[jira] [Updated] (IGNITE-17083) Base classes for full rebalance procedure

     [ https://issues.apache.org/jira/browse/IGNITE-17083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ivan Bessonov updated IGNITE-17083:
-----------------------------------
    Summary: Base classes for full rebalance procedure  (was: Universal full rebalance procedure for MV storage)

> Base classes for full rebalance procedure
> -----------------------------------------
>
>                 Key: IGNITE-17083
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17083
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ivan Bessonov
>            Assignee: Ivan Bessonov
>            Priority: Major
>              Labels: ignite-3
>             Fix For: 3.0.0-alpha6
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Canonical way to make "full rebalance" in RAFT is to have a persisted snapshots of data. This is not always a good idea. First of all, for persistent data is already stored somewhere and can be read at any time. Second, for volatile storage this requirement is just absurd.
> So, a "rebalance snapshot" should be streamed from one node to another instead of being written to a storage. What's good is that this approach can be implemented independently from the storage engine (with few adjustments to storage API, of course).
> h2. General idea
> Once a "rebalance snapshot" operation is triggered, we open a special type of cursor from the partition storage, that is able to give us all versioned chains in {_}some fixed order{_}. Every time the next chain has been read, it's remembered as the last read (let's call it\{{ lastRowId}} for now). Then all versions for the specific row id should be sent to receiver node in "Oldest to Newest" order to simplify insertion.
> This works fine without concurrent load. To account for that we need to have a additional collection of row ids, associated with a snapshot. Let's call it {{{}overwrittenRowIds{}}}.
> With this in mind, every write command should look similar to this:
> {noformat}
> for (var rebalanceSnaphot : ongoingRebalanceSnapshots) {
>   try (var lock = rebalanceSnaphot.lock()) {
>     if (rowId <= rebalanceSnaphot.lastRowId())
>       continue;
>     if (!rebalanceSnaphot.overwrittenRowIds().put(rowId))
>       continue;
>     rebalanceSnapshot.sendRowToReceiver(rowId);
>   }
> }
> // Now modification can be freely performed.
> // Snapshot itself will skip everything from the "overwrittenRowIds" collection.{noformat}
> NOTE: rebalance snapshot scan must also return uncommitted write intentions. Their commit will be replicated later from the RAFT log.
> NOTE: receiving side will have to rebuild indexes during the rebalancing. Just like it works in Ignite 2.x.
> NOTE: Technically it is possible to have several nodes entering the cluster that require a full rebalance. So, while triggering a rebalance snapshot cursor, we could wait for other nodes that might want to read the same data and process all of them with a single scan. This is an optimization, obviously.
> h2. Implementation
> The implementation will have to be split into several parts, because we need:
>  * Support for snapshot streaming in RAFT state machine.
>  * Storage API for this type of scan.
>  * Every storage must implement the new scan method.
>  * Streamer itself should be implemented, along with a specific logic in write commands.
> h4. Sender
> {code:java}
> snaphot.lastRowId = minRowId(partitionId);
> while (snaphot.lastRowId != null) {
>     try (var lock = snaphot.lock()) {
>         snaphot.lastRowId = partition.closestRowId(snaphot.lastRowId);
>         if (snapshot.lastRowId == null) {
>            break;
>         }
>         if (!snapshot.overwrittenRowIds.remove(snapshot.lastRowId)) {
>             snapshot.sendRowToReceiver(snapshot.lastRowId);
>         }
>         snapshot.lastRowId = snapshot.lastRowId.increment();
>     }
> }
> snapshot.finish();{code}
> There's an opened question of how exactly do we send data. Description above implies that we have a push model, but it's not necessarily the truth.
> From the receiver perspective, Flow/Published API would fit the code perfectly. Basically, existing jraft implementation does it the same way.
> The only possible issue is an potential uncontrollable growth of overwrittenRowIds collection and a queue of _cached_ data that should be sent to the receiver. For these, we can use several different approaches:
>  * just ignore the problem and hope that nodes will handle it
>  * throttle write command int state machine
>  * implement additional push messages to force a receiver to handle these messages, thus (hopefully) reducing the amount of heap that we need
> h4. Receiver
> No pseudo-code this time. I believe that we should use a special API for insertions. {{{}addWrite{}}}/{{{}commitWrite{}}} pair is just not efficient enough.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)