You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Alexander Lapin (Jira)" <ji...@apache.org> on 2022/05/04 17:10:00 UTC

[jira] [Updated] (IGNITE-16668) Design in-memory raft group reconfiguration on node failure

     [ https://issues.apache.org/jira/browse/IGNITE-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexander Lapin updated IGNITE-16668:
-------------------------------------
    Description: 
If a node storing a partition of an in-memory table fails and leaves the cluster all data it had is lost. From the point of view of the partition it looks like as the node is left forever.

Although Raft protocol tolerates leaving some amount of nodes composing Raft group (partition); for in-memory caches we cannot restore replica factor because of in-memory nature of the table.

It means that we need to detect failures of each node owning a partition and recalculate assignments for the table without keeping replica factor.
h4. Upd 1:
h4. Problem

By design raft has several persisted segments, e.g. raft meta (term/committedIndex) and stable raft log. So, by converting common raft to in-memory one it’s possible to break some of it’s invariants. For example Node C could vote for Candidate A before self-restart and vote then for Candidate B after one. As a result two leaders will be elected which is illegal.
 
!Screenshot from 2022-04-19 11-11-05.png!
 
h4. Solution

In order to solve the problem mentioned above it’s possible to remove and then return back the restarting node from the peers of the corresponding raft group. The peer-removal process should be finished before the restarting of the corresponding raft server node.
 
  !Screenshot from 2022-04-19 11-12-55.png!
 
The process of removing and then returning back the restarting node is however itself tricky. And to answer why it’s non-trivial action, it’s necessary to reveal the main ideas of the rebalance protocol.

Reconfiguration of the raft group - is a process driven by the fact of changing the assignments. Each partition has three corresponding sets of assignments stored in the metastore:
 # assignments.stable - current distribution

 # assignments.pending - partition distribution for an ongoing rebalance if any

 # assignments.planned - in some cases it’s not possible to cancel or merge pending rebalance with new one. In that case newly calculated assignments will be stored explicitly with corresponding assignments.planned key. It's worth noting that it doesn't make sense to keep more than one planned rebalance. Any new scheduled one will overwrite already existing.

However such idea of overwriting the assignments.planned key wont work within the context of an in-memory raft restart, because it’s not valid to overwrite the reduction of assignments. Let's illustrate this problem with the following example.
 # In-memory partition p1 is hosted on nodes A, B and C, meaning that p1.assignments.stable=[A,B,C]

 # Let's say that the baseline was changed, resulting in a rebalance on assignments.pending=[A,B,C,*D*]

 # During the non-cancelable phase of [A,B,C]->[A,B,C,D], node C fails and returns back, meaning that we should plan [A,B,D] and [A,B,C,D] assignments. Both must be recorded in the only assignments.planned key meaning that [A,B,C,D] will overwrite reduction [A,B,D], so no actual raft reconfiguration will take place, which is not acceptable.

In order to overcome given issue, let’s introduce two new keys _assignments.switch.reduce_ that will hold nodes that should be removed and _assignments.switch.append_ that will hold nodes that should be returned back and run following actions:
h5. On in-memory partition restart (or on partition start with cleaned-up PDS):

 
{code:java}
do {
     metastoreInvoke:
         if empty(assignments.switch.reduce) || eq(revision(assignments.switch.reduce), retrievedRevision) // assignments.switch.reduce didn't change
        
} while (!invokeRes);{code}
 

 
h6. as-is:

N/A
h6. to-be:
{code:java}
metastoreInvoke*: // atomic metastore call through multi-invoke api
    if empty(partition.assignments.change.trigger.revision) || partition.assignments.change.trigger.revision < event.revision:
        var assignmentsSwitch = union(assignments.switch<oldVal>, assignments.switch<newVal>)
         
        if empty(partition.assignments.pending)
            partition.assignments.pending = substract(partition.assignments.stable, assignmentsSwitch) 
            partition.assignments.switch = assignmentsSwitch
            partition.assignments.change.trigger.revision = event.revision
        else:
            partition.assignments.switch = assignmentsSwitch
            partition.assignments.change.trigger.revision = event.revision
    else:

        skip

{code}
h5. On rebalance done
h6. as-is:
{code:java}
metastoreInvoke: \\ atomic
    partition.assignments.stable = appliedPeers
    if empty(partition.assignments.planned):
        partition.assignments.pending = empty
    else:
        partition.assignments.pending = partition.assignments.planned 
        remove(partition.assignments.planned){code}
h6. to-be:
{code:java}
metastoreInvoke: \\ atomic
    partition.assignments.stable = appliedPeers
    
    if !empty(parition.assignments.switch)
        var stableSwitchSubtract = subtract(parition.assignments.switch, partition.assignments.stable)
        if !empty(stableSwitchSubtract) // return returned node
            partition.assignments.pending = union(partition.assignments.stable, stableSwitchSubtract) 
            partition.assignments.switch = subtract(partition.assignments.switch, stableSwitchSubtract)
            partition.assignments.change.trigger.revision = event.revision
        else // subtract returned node
            partition.assignments.pending = subtract(partition.assignments.stable, parition.assignments.switch)
            partition.assignments.change.trigger.revision = event.revision     
    else
        if empty(partition.assignments.planned):
            remove(partition.assignments.pending)
        else:
            partition.assignments.pending = partition.assignments.planned
            remove(partition.assignments.planned){code}
 It’s also possible to update onCommonRebalance in a similar way, however it’s only an optimization because onRebalanceDone will always consider switch as a higher priority action in comparison to planned rebalance triggered by baseline change or replica factor change.

Besides that, in many cases, especially for in-memory partitions, it is worth removing leaving node from raft group peers not within its restart but during corresponding networkTopology.onDisappeared() event. However It’s only an optimization. E.g. in the case of persisted table with cleaned up PDS it’s not known whether it was cleaned up until node self-check on table start up. Thus, the correctness of the algorithm is achieved due to the actions taken exclusively when the node is restarted - the rest is optimization.

All in all general flow on node restart will look like following:
{code:java}
private void onParitionStart(Partition p) {
        if (p.table().inMememory() || !p.localStorage().isAvailable()) {
            if (p.isMajorityAvailable()) {
                    <inline metaStorageInvoke*>
                    // Do not start corresponding raft server!
            } else if (fullParitionRestart) {
                    commonStart();
            } else {
                    // Do nothing.
            }
        } else {
            commonStart();
        }
} {code}

  was:
If a node storing a partition of an in-memory table fails and leaves the cluster all data it had is lost. From the point of view of the partition it looks like as the node is left forever.

Although Raft protocol tolerates leaving some amount of nodes composing Raft group (partition); for in-memory caches we cannot restore replica factor because of in-memory nature of the table.

It means that we need to detect failures of each node owning a partition and recalculate assignments for the table without keeping replica factor.
h4. Upd 1:
h4. Problem

By design raft has several persisted segments, e.g. raft meta (term/committedIndex) and stable raft log. So, by converting common raft to in-memory one it’s possible to break some of it’s invariants. For example Node C could vote for Candidate A before self-restart and vote then for Candidate B after one. As a result two leaders will be elected which is illegal.
 
!Screenshot from 2022-04-19 11-11-05.png!
 
h4. Solution

In order to solve the problem mentioned above it’s possible to remove and then return back the restarting node from the peers of the corresponding raft group. The peer-removal process should be finished before the restarting of the corresponding raft server node.
 
  !Screenshot from 2022-04-19 11-12-55.png!
 
The process of removing and then returning back the restarting node is however itself tricky. And to answer why it’s non-trivial action, it’s necessary to reveal the main ideas of the rebalance protocol.

Reconfiguration of the raft group - is a process driven by the fact of changing the assignments. Each partition has three corresponding sets of assignments stored in the metastore:
 # assignments.stable - current distribution

 # assignments.pending - partition distribution for an ongoing rebalance if any

 # assignments.planned - in some cases it’s not possible to cancel or merge pending rebalance with new one. In that case newly calculated assignments will be stored explicitly with corresponding assignments.planned key. It's worth noting that it doesn't make sense to keep more than one planned rebalance. Any new scheduled one will overwrite already existing.

However such idea of overwriting the assignments.planned key wont work within the context of an in-memory raft restart, because it’s not valid to overwrite the reduction of assignments. Let's illustrate this problem with the following example.
 # In-memory partition p1 is hosted on nodes A, B and C, meaning that p1.assignments.stable=[A,B,C]

 # Let's say that the baseline was changed, resulting in a rebalance on assignments.pending=[A,B,C,*D*]

 # During the non-cancelable phase of [A,B,C]->[A,B,C,D], node C fails and returns back, meaning that we should plan [A,B,D] and [A,B,C,D] assignments. Both must be recorded in the only assignments.planned key meaning that [A,B,C,D] will overwrite reduction [A,B,D], so no actual raft reconfiguration will take place, which is not acceptable.

In order to overcome given issue, let’s introduce new key _assignments.switch_ that will hold nodes that should be removed and then returned back and run following actions:
h5. On in-memory partition restart (or on partition start with cleaned-up PDS):
h6. as-is:

N/A
h6. to-be:
{code:java}
metastoreInvoke*: // atomic metastore call through multi-invoke api
    if empty(partition.assignments.change.trigger.revision) || partition.assignments.change.trigger.revision < event.revision:
        var assignmentsSwitch = union(assignments.switch<oldVal>, assignments.switch<newVal>)
         
        if empty(partition.assignments.pending)
            partition.assignments.pending = substract(partition.assignments.stable, assignmentsSwitch) 
            partition.assignments.switch = assignmentsSwitch
            partition.assignments.change.trigger.revision = event.revision
        else:
            partition.assignments.switch = assignmentsSwitch
            partition.assignments.change.trigger.revision = event.revision
    else:

        skip

{code}
h5. On rebalance done
h6. as-is:
{code:java}
metastoreInvoke: \\ atomic
    partition.assignments.stable = appliedPeers
    if empty(partition.assignments.planned):
        partition.assignments.pending = empty
    else:
        partition.assignments.pending = partition.assignments.planned 
        remove(partition.assignments.planned){code}
h6. to-be:
{code:java}
metastoreInvoke: \\ atomic
    partition.assignments.stable = appliedPeers
    
    if !empty(parition.assignments.switch)
        var stableSwitchSubtract = subtract(parition.assignments.switch, partition.assignments.stable)
        if !empty(stableSwitchSubtract) // return returned node
            partition.assignments.pending = union(partition.assignments.stable, stableSwitchSubtract) 
            partition.assignments.switch = subtract(partition.assignments.switch, stableSwitchSubtract)
            partition.assignments.change.trigger.revision = event.revision
        else // subtract returned node
            partition.assignments.pending = subtract(partition.assignments.stable, parition.assignments.switch)
            partition.assignments.change.trigger.revision = event.revision     
    else
        if empty(partition.assignments.planned):
            remove(partition.assignments.pending)
        else:
            partition.assignments.pending = partition.assignments.planned
            remove(partition.assignments.planned){code}
 It’s also possible to update onCommonRebalance in a similar way, however it’s only an optimization because onRebalanceDone will always consider switch as a higher priority action in comparison to planned rebalance triggered by baseline change or replica factor change.

Besides that, in many cases, especially for in-memory partitions, it is worth removing leaving node from raft group peers not within its restart but during corresponding networkTopology.onDisappeared() event. However It’s only an optimization. E.g. in the case of persisted table with cleaned up PDS it’s not known whether it was cleaned up until node self-check on table start up. Thus, the correctness of the algorithm is achieved due to the actions taken exclusively when the node is restarted - the rest is optimization.

All in all general flow on node restart will look like following:
{code:java}
private void onParitionStart(Partition p) {
        if (p.table().inMememory() || !p.localStorage().isAvailable()) {
            if (p.isMajorityAvailable()) {
                    <inline metaStorageInvoke*>
                    // Do not start corresponding raft server!
            } else if (fullParitionRestart) {
                    commonStart();
            } else {
                    // Do nothing.
            }
        } else {
            commonStart();
        }
} {code}


> Design in-memory raft group reconfiguration on node failure
> -----------------------------------------------------------
>
>                 Key: IGNITE-16668
>                 URL: https://issues.apache.org/jira/browse/IGNITE-16668
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Sergey Chugunov
>            Assignee: Alexander Lapin
>            Priority: Major
>              Labels: ignite-3
>         Attachments: Screenshot from 2022-04-19 11-11-05.png, Screenshot from 2022-04-19 11-12-55-1.png, Screenshot from 2022-04-19 11-12-55.png
>
>
> If a node storing a partition of an in-memory table fails and leaves the cluster all data it had is lost. From the point of view of the partition it looks like as the node is left forever.
> Although Raft protocol tolerates leaving some amount of nodes composing Raft group (partition); for in-memory caches we cannot restore replica factor because of in-memory nature of the table.
> It means that we need to detect failures of each node owning a partition and recalculate assignments for the table without keeping replica factor.
> h4. Upd 1:
> h4. Problem
> By design raft has several persisted segments, e.g. raft meta (term/committedIndex) and stable raft log. So, by converting common raft to in-memory one it’s possible to break some of it’s invariants. For example Node C could vote for Candidate A before self-restart and vote then for Candidate B after one. As a result two leaders will be elected which is illegal.
>  
> !Screenshot from 2022-04-19 11-11-05.png!
>  
> h4. Solution
> In order to solve the problem mentioned above it’s possible to remove and then return back the restarting node from the peers of the corresponding raft group. The peer-removal process should be finished before the restarting of the corresponding raft server node.
>  
>   !Screenshot from 2022-04-19 11-12-55.png!
>  
> The process of removing and then returning back the restarting node is however itself tricky. And to answer why it’s non-trivial action, it’s necessary to reveal the main ideas of the rebalance protocol.
> Reconfiguration of the raft group - is a process driven by the fact of changing the assignments. Each partition has three corresponding sets of assignments stored in the metastore:
>  # assignments.stable - current distribution
>  # assignments.pending - partition distribution for an ongoing rebalance if any
>  # assignments.planned - in some cases it’s not possible to cancel or merge pending rebalance with new one. In that case newly calculated assignments will be stored explicitly with corresponding assignments.planned key. It's worth noting that it doesn't make sense to keep more than one planned rebalance. Any new scheduled one will overwrite already existing.
> However such idea of overwriting the assignments.planned key wont work within the context of an in-memory raft restart, because it’s not valid to overwrite the reduction of assignments. Let's illustrate this problem with the following example.
>  # In-memory partition p1 is hosted on nodes A, B and C, meaning that p1.assignments.stable=[A,B,C]
>  # Let's say that the baseline was changed, resulting in a rebalance on assignments.pending=[A,B,C,*D*]
>  # During the non-cancelable phase of [A,B,C]->[A,B,C,D], node C fails and returns back, meaning that we should plan [A,B,D] and [A,B,C,D] assignments. Both must be recorded in the only assignments.planned key meaning that [A,B,C,D] will overwrite reduction [A,B,D], so no actual raft reconfiguration will take place, which is not acceptable.
> In order to overcome given issue, let’s introduce two new keys _assignments.switch.reduce_ that will hold nodes that should be removed and _assignments.switch.append_ that will hold nodes that should be returned back and run following actions:
> h5. On in-memory partition restart (or on partition start with cleaned-up PDS):
>  
> {code:java}
> do {
>      metastoreInvoke:
>          if empty(assignments.switch.reduce) || eq(revision(assignments.switch.reduce), retrievedRevision) // assignments.switch.reduce didn't change
>         
> } while (!invokeRes);{code}
>  
>  
> h6. as-is:
> N/A
> h6. to-be:
> {code:java}
> metastoreInvoke*: // atomic metastore call through multi-invoke api
>     if empty(partition.assignments.change.trigger.revision) || partition.assignments.change.trigger.revision < event.revision:
>         var assignmentsSwitch = union(assignments.switch<oldVal>, assignments.switch<newVal>)
>          
>         if empty(partition.assignments.pending)
>             partition.assignments.pending = substract(partition.assignments.stable, assignmentsSwitch) 
>             partition.assignments.switch = assignmentsSwitch
>             partition.assignments.change.trigger.revision = event.revision
>         else:
>             partition.assignments.switch = assignmentsSwitch
>             partition.assignments.change.trigger.revision = event.revision
>     else:
>         skip
> {code}
> h5. On rebalance done
> h6. as-is:
> {code:java}
> metastoreInvoke: \\ atomic
>     partition.assignments.stable = appliedPeers
>     if empty(partition.assignments.planned):
>         partition.assignments.pending = empty
>     else:
>         partition.assignments.pending = partition.assignments.planned 
>         remove(partition.assignments.planned){code}
> h6. to-be:
> {code:java}
> metastoreInvoke: \\ atomic
>     partition.assignments.stable = appliedPeers
>     
>     if !empty(parition.assignments.switch)
>         var stableSwitchSubtract = subtract(parition.assignments.switch, partition.assignments.stable)
>         if !empty(stableSwitchSubtract) // return returned node
>             partition.assignments.pending = union(partition.assignments.stable, stableSwitchSubtract) 
>             partition.assignments.switch = subtract(partition.assignments.switch, stableSwitchSubtract)
>             partition.assignments.change.trigger.revision = event.revision
>         else // subtract returned node
>             partition.assignments.pending = subtract(partition.assignments.stable, parition.assignments.switch)
>             partition.assignments.change.trigger.revision = event.revision     
>     else
>         if empty(partition.assignments.planned):
>             remove(partition.assignments.pending)
>         else:
>             partition.assignments.pending = partition.assignments.planned
>             remove(partition.assignments.planned){code}
>  It’s also possible to update onCommonRebalance in a similar way, however it’s only an optimization because onRebalanceDone will always consider switch as a higher priority action in comparison to planned rebalance triggered by baseline change or replica factor change.
> Besides that, in many cases, especially for in-memory partitions, it is worth removing leaving node from raft group peers not within its restart but during corresponding networkTopology.onDisappeared() event. However It’s only an optimization. E.g. in the case of persisted table with cleaned up PDS it’s not known whether it was cleaned up until node self-check on table start up. Thus, the correctness of the algorithm is achieved due to the actions taken exclusively when the node is restarted - the rest is optimization.
> All in all general flow on node restart will look like following:
> {code:java}
> private void onParitionStart(Partition p) {
>         if (p.table().inMememory() || !p.localStorage().isAvailable()) {
>             if (p.isMajorityAvailable()) {
>                     <inline metaStorageInvoke*>
>                     // Do not start corresponding raft server!
>             } else if (fullParitionRestart) {
>                     commonStart();
>             } else {
>                     // Do nothing.
>             }
>         } else {
>             commonStart();
>         }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)