You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Ivan Bessonov (Jira)" <ji...@apache.org> on 2022/10/05 06:54:00 UTC

[jira] [Updated] (IGNITE-17083) Universal full rebalance procedure for MV storage

     [ https://issues.apache.org/jira/browse/IGNITE-17083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ivan Bessonov updated IGNITE-17083:
-----------------------------------
    Description: 
Canonical way to make "full rebalance" in RAFT is to have a persisted snapshots of data. This is not always a good idea. First of all, for persistent data is already stored somewhere and can be read at any time. Second, for volatile storage this requirement is just absurd.

So, a "rebalance snapshot" should be streamed from one node to another instead of being written to a storage. What's good is that this approach can be implemented independently from the storage engine (with few adjustments to storage API, of course).
h2. General idea

Once a "rebalance snapshot" operation is triggered, we open a special type of cursor from the partition storage, that is able to give us all versioned chains in {_}some fixed order{_}. Every time the next chain has been read, it's remembered as the last read (let's call it\{{ lastRowId}} for now). Then all versions for the specific row id should be sent to receiver node in "Oldest to Newest" order to simplify insertion.

This works fine without concurrent load. To account for that we need to have a additional collection of row ids, associated with a snapshot. Let's call it {{{}overwrittenRowIds{}}}.

With this in mind, every write command should look similar to this:
{noformat}
for (var rebalanceSnaphot : ongoingRebalanceSnapshots) {
  try (var lock = rebalanceSnaphot.lock()) {
    if (rowId <= rebalanceSnaphot.lastRowId())
      continue;

    if (!rebalanceSnaphot.overwrittenRowIds().put(rowId))
      continue;

    rebalanceSnapshot.sendRowToReceiver(rowId);
  }
}

// Now modification can be freely performed.
// Snapshot itself will skip everything from the "overwrittenRowIds" collection.{noformat}
NOTE: rebalance snapshot scan must also return uncommitted write intentions. Their commit will be replicated later from the RAFT log.

NOTE: receiving side will have to rebuild indexes during the rebalancing. Just like it works in Ignite 2.x.

NOTE: Technically it is possible to have several nodes entering the cluster that require a full rebalance. So, while triggering a rebalance snapshot cursor, we could wait for other nodes that might want to read the same data and process all of them with a single scan. This is an optimization, obviously.
h2. Implementation

The implementation will have to be split into several parts, because we need:
 * Support for snapshot streaming in RAFT state machine.
 * Storage API for this type of scan.
 * Every storage must implement the new scan method.
 * Streamer itself should be implemented, along with a specific logic in write commands.

h4. Sender
{code:java}
snaphot.lastRowId = minRowId(partitionId);

while (snaphot.lastRowId != null) {
    try (var lock = snaphot.lock()) {
        snaphot.lastRowId = partition.closestRowId(snaphot.lastRowId);

        if (snapshot.lastRowId == null) {
           break;
        }

        if (!snapshot.overwrittenRowIds.remove(snapshot.lastRowId)) {
            snapshot.sendRowToReceiver(snapshot.lastRowId);
        }

        snapshot.lastRowId = snapshot.lastRowId.increment();
    }
}

snapshot.finish();{code}
There's an opened question of how exactly do we send data. Description above implies that we have a push model, but it's not necessarily the truth.

From the receiver perspective, Flow/Published API would fit the code perfectly. Basically, existing jraft implementation does it the same way.

The only possible issue is an potential uncontrollable growth of overwrittenRowIds collection and a queue of _cached_ data that should be sent to the receiver. For these, we can use several different approaches:
 * just ignore the problem and hope that nodes will handle it
 * throttle write command int state machine
 * implement additional push messages to force a receiver to handle these messages, thus (hopefully) reducing the amount of heap that we need

h4. Receiver

No pseudo-code this time. I believe that we should use a special API for insertions. {{{}addWrite{}}}/{{{}commitWrite{}}} pair is just not efficient enough.

  was:
Canonical way to make "full rebalance" in RAFT is to have a persisted snapshots of data. This is not always a good idea. First of all, for persistent data is already stored somewhere and can be read at any time. Second, for volatile storage this requirement is just absurd.

So, a "rebalance snapshot" should be streamed from one node to another instead of being written to a storage. What's good is that this approach can be implemented independently from the storage engine (with few adjustments to storage API, of course).
h2. General idea

Once a "rebalance snapshot" operation is triggered, we open a special type of cursor from the partition storage, that is able to give us all versioned chains in {_}some fixed order{_}. Every time the next chain has been read, it's remembered as the last read (let's call it\{{ lastRowId}} for now). Then all versions for the specific row id should be sent to receiver node in "Oldest to Newest" order to simplify insertion.

This works fine without concurrent load. To account for that we need to have a additional collection of row ids, associated with a snapshot. Let's call it {{{}overwrittenRowIds{}}}.

With this in mind, every write command should look similar to this:
{noformat}
for (var rebalanceSnaphot : ongoingRebalanceSnapshots) {
  try (var lock = rebalanceSnaphot.lock()) {
    if (rowId <= rebalanceSnaphot.lastRowId())
      continue;

    if (!rebalanceSnaphot.overwrittenRowIds().put(rowId))
      continue;

    rebalanceSnapshot.sendRowToReceiver(rowId);
  }
}

// Now modification can be freely performed.
// Snapshot itself will skip everything from the "overwrittenRowIds" collection.{noformat}
NOTE: rebalance snapshot scan must also return uncommitted write intentions. Their commit will be replicated later from the RAFT log.

NOTE: receiving side will have to rebuild indexes during the rebalancing. Just like it works in Ignite 2.x.

NOTE: Technically it is possible to have several nodes entering the cluster that require a full rebalance. So, while triggering a rebalance snapshot cursor, we could wait for other nodes that might want to read the same data and process all of them with a single scan. This is an optimization, obviously.
h2. Implementation

The implementation will have to be split into several parts, because we need:
 * Support for snapshot streaming in RAFT state machine.
 * Storage API for this type of scan.
 * Every storage must implement the new scan method.
 * Streamer itself should be implemented, along with a specific logic in write commands.

h4. Sender
{code:java}
snaphot.lastRowId = minRowId(partitionId);

while (rebalanceSnaphot.lastRowId != null) {
    try (var lock = snaphot.lock()) {
        snaphot.lastRowId = partition.closestRowId(snaphot.lastRowId);

        if (snapshot.lastRowId == null) {
           break;
        }

        if (!snapshot.overwrittenRowIds.remove(snapshot.lastRowId)) {
            snapshot.sendRowToReceiver(snapshot.lastRowId);
        }

        snapshot.lastRowId = snapshot.lastRowId.increment();
    }
}

snapshot.finish();{code}
There's an opened question of how exactly do we send data. Description above implies that we have a push model, but it's not necessarily the truth.

From the receiver perspective, Flow/Published API would fit the code perfectly. Basically, existing jraft implementation does it the same way.

The only possible issue is an potential uncontrollable growth of overwrittenRowIds collection and a queue of _cached_ data that should be sent to the receiver. For these, we can use several different approaches:
 * just ignore the problem and hope that nodes will handle it
 * throttle write command int state machine
 * implement additional push messages to force a receiver to handle these messages, thus (hopefully) reducing the amount of heap that we need

h4. Receiver

No pseudo-code this time. I believe that we should use a special API for insertions. {{{}addWrite{}}}/{{{}commitWrite{}}} pair is just not efficient enough.


> Universal full rebalance procedure for MV storage
> -------------------------------------------------
>
>                 Key: IGNITE-17083
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17083
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ivan Bessonov
>            Assignee: Ivan Bessonov
>            Priority: Major
>              Labels: ignite-3
>
> Canonical way to make "full rebalance" in RAFT is to have a persisted snapshots of data. This is not always a good idea. First of all, for persistent data is already stored somewhere and can be read at any time. Second, for volatile storage this requirement is just absurd.
> So, a "rebalance snapshot" should be streamed from one node to another instead of being written to a storage. What's good is that this approach can be implemented independently from the storage engine (with few adjustments to storage API, of course).
> h2. General idea
> Once a "rebalance snapshot" operation is triggered, we open a special type of cursor from the partition storage, that is able to give us all versioned chains in {_}some fixed order{_}. Every time the next chain has been read, it's remembered as the last read (let's call it\{{ lastRowId}} for now). Then all versions for the specific row id should be sent to receiver node in "Oldest to Newest" order to simplify insertion.
> This works fine without concurrent load. To account for that we need to have a additional collection of row ids, associated with a snapshot. Let's call it {{{}overwrittenRowIds{}}}.
> With this in mind, every write command should look similar to this:
> {noformat}
> for (var rebalanceSnaphot : ongoingRebalanceSnapshots) {
>   try (var lock = rebalanceSnaphot.lock()) {
>     if (rowId <= rebalanceSnaphot.lastRowId())
>       continue;
>     if (!rebalanceSnaphot.overwrittenRowIds().put(rowId))
>       continue;
>     rebalanceSnapshot.sendRowToReceiver(rowId);
>   }
> }
> // Now modification can be freely performed.
> // Snapshot itself will skip everything from the "overwrittenRowIds" collection.{noformat}
> NOTE: rebalance snapshot scan must also return uncommitted write intentions. Their commit will be replicated later from the RAFT log.
> NOTE: receiving side will have to rebuild indexes during the rebalancing. Just like it works in Ignite 2.x.
> NOTE: Technically it is possible to have several nodes entering the cluster that require a full rebalance. So, while triggering a rebalance snapshot cursor, we could wait for other nodes that might want to read the same data and process all of them with a single scan. This is an optimization, obviously.
> h2. Implementation
> The implementation will have to be split into several parts, because we need:
>  * Support for snapshot streaming in RAFT state machine.
>  * Storage API for this type of scan.
>  * Every storage must implement the new scan method.
>  * Streamer itself should be implemented, along with a specific logic in write commands.
> h4. Sender
> {code:java}
> snaphot.lastRowId = minRowId(partitionId);
> while (snaphot.lastRowId != null) {
>     try (var lock = snaphot.lock()) {
>         snaphot.lastRowId = partition.closestRowId(snaphot.lastRowId);
>         if (snapshot.lastRowId == null) {
>            break;
>         }
>         if (!snapshot.overwrittenRowIds.remove(snapshot.lastRowId)) {
>             snapshot.sendRowToReceiver(snapshot.lastRowId);
>         }
>         snapshot.lastRowId = snapshot.lastRowId.increment();
>     }
> }
> snapshot.finish();{code}
> There's an opened question of how exactly do we send data. Description above implies that we have a push model, but it's not necessarily the truth.
> From the receiver perspective, Flow/Published API would fit the code perfectly. Basically, existing jraft implementation does it the same way.
> The only possible issue is an potential uncontrollable growth of overwrittenRowIds collection and a queue of _cached_ data that should be sent to the receiver. For these, we can use several different approaches:
>  * just ignore the problem and hope that nodes will handle it
>  * throttle write command int state machine
>  * implement additional push messages to force a receiver to handle these messages, thus (hopefully) reducing the amount of heap that we need
> h4. Receiver
> No pseudo-code this time. I believe that we should use a special API for insertions. {{{}addWrite{}}}/{{{}commitWrite{}}} pair is just not efficient enough.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)