You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2022/06/08 14:56:40 UTC

[ignite-3] branch main updated: IGNITE-17013 added movePartition method to the TableManager with retry logic for exceptional cases (#851)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 073dd6484 IGNITE-17013 added movePartition method to the TableManager with retry logic for exceptional cases (#851)
073dd6484 is described below

commit 073dd6484b3fa1da86c62e84bd87bd5d0c99dc24
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Wed Jun 8 17:56:35 2022 +0300

    IGNITE-17013 added movePartition method to the TableManager with retry logic for exceptional cases (#851)
---
 .../ignite/internal/util/IgniteSpinBusyLock.java   |   2 +-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |   2 +-
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  |  37 +++---
 .../raft/jraft/core/RaftGroupServiceTest.java      |  11 +-
 .../apache/ignite/raft/jraft/test/TestUtils.java   |  12 ++
 .../internal/table/distributed/TableManager.java   |  50 +++++++-
 .../raft/RebalanceRaftGroupEventsListener.java     |  35 ++---
 .../ignite/internal/utils/RebalanceUtil.java       |  11 ++
 .../table/{ => distributed}/TableManagerTest.java  | 142 ++++++++++++++++++++-
 9 files changed, 242 insertions(+), 60 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
index e42ebca70..1a807631e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
  *
  * <p>For example, there may be a manager that have different threads for some purposes and the manager must not be stopped while at least a
  * single thread is in "busy" state. In this situation each thread must enter to "busy" state calling method {@link #enterBusy()} in
- * critical pieces of code which, i.e. use grid kernal functionality, notifying that the manager and the whole grid kernal cannot be stopped
+ * critical pieces of code, notifying that the manager and the whole ignite node cannot be stopped
  * while it's in progress. Once the activity is done, the thread should leave "busy" state calling method {@link #leaveBusy()}. The manager
  * itself, when stopping, should call method {@link #block} that blocks till all activities leave "busy" state.
  *
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 80bc52e72..962fbfa90 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -2536,7 +2536,7 @@ public class NodeImpl implements Node, RaftServerService {
             }
             return;
         }
-        // Return immediately when the new peers equals to current configuration
+        // Return immediately when the new peers equals to the current configuration
         if (this.conf.getConf().equals(newConf)) {
             Closure newDone = (Status status) -> {
                 // doOnNewPeersConfigurationApplied should be called, otherwise we could lose the callback invocation.
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 3d634726e..8ea0d0f0b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -68,7 +69,6 @@ import org.apache.ignite.raft.jraft.rpc.ActionRequest;
 import org.apache.ignite.raft.jraft.rpc.ActionResponse;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
-import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.jetbrains.annotations.NotNull;
 
@@ -359,8 +359,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     @Override public CompletableFuture<Void> changePeersAsync(List<Peer> peers, long term) {
         Peer leader = this.leader;
 
-        if (leader == null)
+        if (leader == null) {
             return refreshLeader().thenCompose(res -> changePeersAsync(peers, term));
+        }
 
         List<String> peersToChange = peers.stream().map(p -> PeerId.fromPeer(p).toString())
                 .collect(Collectors.toList());
@@ -373,7 +374,17 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
 
-        return fut.thenRun(() -> {});
+        return fut.handle((resp, err) -> {
+            // We expect that all raft related errors will be handled by sendWithRetry, means that
+            // such responses will initiate a retrying of the original request.
+            assert !(resp instanceof RpcRequests.ErrorResponse);
+
+            if (err != null) {
+                return CompletableFuture.<Void>failedFuture(err);
+            }
+
+            return CompletableFuture.<Void>completedFuture(null);
+        }).thenCompose(Function.identity());
     }
 
     /** {@inheritDoc} */
@@ -645,7 +656,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
      * @param t The throwable.
      * @return {@code True} if this is a recoverable exception.
      */
-    private boolean recoverable(Throwable t) {
+    private static boolean recoverable(Throwable t) {
         if (t instanceof ExecutionException || t instanceof CompletionException) {
             t = t.getCause();
         }
@@ -705,22 +716,4 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         return res;
     }
-
-    /**
-     * Convert list of {@link PeerId} to list of {@link Peer}.
-     *
-     * @param peers List of {@link PeerId}
-     * @return List of {@link Peer}
-     */
-    private List<Peer> convertPeerIdList(List<PeerId> peers) {
-        if (peers == null)
-            return Collections.emptyList();
-
-        List<Peer> res = new ArrayList<>(peers.size());
-
-        for (PeerId peerId: peers)
-            res.add(peerFromPeerId(peerId));
-
-        return res;
-    }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
index 2650c193a..732bea151 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.core;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -877,16 +878,6 @@ public class RaftGroupServiceTest {
 
     }
 
-    /**
-     * Convert list of {@link Peer} to list of string representations.
-     *
-     * @param peers List of {@link Peer}
-     * @return List of string representations.
-     */
-    private List<String> peersToIds(List<Peer> peers) {
-        return peers.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
-    }
-
     /** */
     private static class TestCommand implements WriteCommand {
     }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
index 786cecbe0..3224fc9e4 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
@@ -26,8 +26,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -215,4 +217,14 @@ public class TestUtils {
 
         return (RpcClientEx) rpcService.getRpcClient();
     }
+
+    /**
+     * Convert list of {@link Peer} to list of string representations.
+     *
+     * @param peers List of {@link Peer}
+     * @return List of string representations.
+     */
+    public static List<String> peersToIds(List<Peer> peers) {
+        return peers.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index eb8522c31..dbe8baecd 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -28,6 +28,7 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_
 import static org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber;
 import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
 import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.recoverable;
 import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
 import static org.apache.ignite.internal.utils.RebalanceUtil.updatePendingAssignmentsKeys;
 
@@ -46,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -468,7 +470,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                     partitionRaftGroupName(tblId, partId),
                                     partId,
                                     busyLock,
-                                    () -> internalTbl.partitionRaftGroupService(partId),
+                                    movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
                                     rebalanceScheduler)
                     ).thenAccept(
                             updatedRaftGroupService -> ((InternalTableImpl) internalTbl)
@@ -1252,7 +1254,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             partId,
                             part,
                             busyLock,
-                            () -> tbl.internalTable().partitionRaftGroupService(part),
+                            movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)),
                             rebalanceScheduler);
 
                     // Stable assignments from the meta store, which revision is bounded by the current pending event.
@@ -1362,6 +1364,50 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         });
     }
 
+    /**
+     * Performs {@link RaftGroupService#changePeersAsync(java.util.List, long)} on a provided raft group service of a partition, so nodes
+     * of the corresponding raft group can be reconfigured.
+     * Retry mechanism is applied to repeat {@link RaftGroupService#changePeersAsync(java.util.List, long)} if previous one
+     * failed with some exception.
+     *
+     * @param raftGroupServiceSupplier Raft groups service of a partition.
+     * @return Function which performs {@link RaftGroupService#changePeersAsync(java.util.List, long)}.
+     */
+    BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartition(Supplier<RaftGroupService> raftGroupServiceSupplier) {
+        return (List<Peer> peers, Long term) -> {
+            if (!busyLock.enterBusy()) {
+                throw new IgniteInternalException(new NodeStoppingException());
+            }
+            try {
+                return raftGroupServiceSupplier.get().changePeersAsync(peers, term).handleAsync((resp, err) -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(new NodeStoppingException());
+                    }
+                    try {
+                        if (err != null) {
+                            if (recoverable(err)) {
+                                LOG.warn("Recoverable error received during changePeersAsync invocation, retrying", err);
+                            } else {
+                                // TODO: Ideally, rebalance, which has initiated this invocation should be canceled,
+                                // TODO: https://issues.apache.org/jira/browse/IGNITE-17056
+                                // TODO: Also it might be reasonable to delegate such exceptional case to a general failure handler.
+                                // TODO: At the moment, we repeat such intents as well.
+                                LOG.error("Unrecoverable error received during changePeersAsync invocation, retrying", err);
+                            }
+                            return movePartition(raftGroupServiceSupplier).apply(peers, term);
+                        }
+
+                        return CompletableFuture.<Void>completedFuture(null);
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                }, rebalanceScheduler).thenCompose(Function.identity());
+            } finally {
+                busyLock.leaveBusy();
+            }
+        };
+    }
+
     /**
      * Gets a direct accessor for the configuration distributed property.
      * If the metadata access only locally configured the method will return local property accessor.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index f625fed47..d8df8b066 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -36,7 +36,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
+import java.util.function.BiFunction;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.internal.configuration.schema.ExtendedTableChange;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -51,7 +51,6 @@ import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.RaftError;
@@ -82,8 +81,8 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
     /** Executor for scheduling rebalance retries. */
     private final ScheduledExecutorService rebalanceScheduler;
 
-    /** Supplier of client for raft group of rebalance listener. */
-    private final Supplier<RaftGroupService> raftGroupServiceSupplier;
+    /** Function that performs a reconfiguration of a raft group of a partition. */
+    private final BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartitionFn;
 
     /** Attempts to retry the current rebalance in case of errors. */
     private final AtomicInteger rebalanceAttempts =  new AtomicInteger(0);
@@ -109,14 +108,14 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
             String partId,
             int partNum,
             IgniteSpinBusyLock busyLock,
-            Supplier<RaftGroupService> raftGroupServiceSupplier,
+            BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartitionFn,
             ScheduledExecutorService rebalanceScheduler) {
         this.metaStorageMgr = metaStorageMgr;
         this.tblConfiguration = tblConfiguration;
         this.partId = partId;
         this.partNum = partNum;
         this.busyLock = busyLock;
-        this.raftGroupServiceSupplier = raftGroupServiceSupplier;
+        this.movePartitionFn = movePartitionFn;
         this.rebalanceScheduler = rebalanceScheduler;
     }
 
@@ -136,18 +135,15 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
                 try {
                     rebalanceAttempts.set(0);
 
-                    metaStorageMgr.get(pendingPartAssignmentsKey(partId))
-                            .thenCompose(pendingEntry -> {
-                                if (!pendingEntry.empty()) {
-                                    List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
+                    Entry pendingEntry = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get();
 
-                                    return raftGroupServiceSupplier.get().changePeersAsync(clusterNodesToPeers(pendingNodes), term);
-                                } else {
-                                    return CompletableFuture.completedFuture(null);
-                                }
-                            }).get();
+                    if (!pendingEntry.empty()) {
+                        List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
+
+                        movePartitionFn.apply(clusterNodesToPeers(pendingNodes), term).join();
+                    }
                 } catch (InterruptedException | ExecutionException e) {
-                    // TODO: IGNITE-17013 errors during this call should be handled by retry logic
+                    // TODO: IGNITE-14693
                     LOG.error("Couldn't start rebalance for partition {} of table {} on new elected leader for term {}",
                             e, partNum, tblConfiguration.name().value(), term);
                 } finally {
@@ -232,10 +228,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
             LOG.info("Started {} attempt to retry the current rebalance for the partId = {}.", rebalanceAttempts.get(), partId);
 
             try {
-                raftGroupServiceSupplier.get().changePeersAsync(peerIdsToPeers(peers), term).get();
-            } catch (InterruptedException | ExecutionException e) {
-                // TODO: IGNITE-17013 errors during this call should be handled by retry logic
-                LOG.error("Error during the rebalance retry for the partId = {}", e, partId);
+                movePartitionFn.apply(peerIdsToPeers(peers), term).join();
             } finally {
                 busyLock.leaveBusy();
             }
@@ -290,7 +283,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
 
             rebalanceAttempts.set(0);
         } catch (InterruptedException | ExecutionException e) {
-            // TODO: IGNITE-17013 errors during this call should be handled by retry logic
+            // TODO: IGNITE-14693
             LOG.error("Couldn't commit new partition configuration to metastore for table = {}, partition = {}",
                     e, tblConfiguration.name(), partNum);
         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 90caa2bdd..47e253746 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -169,4 +169,15 @@ public class RebalanceUtil {
 
         return Integer.parseInt(strKey.substring(strKey.indexOf("_part_") + "_part_".length()));
     }
+
+    /**
+     * Checks if an error is recoverable, so we can retry a rebalance intent.
+     *
+     * @param t The throwable.
+     * @return {@code True} if this is a recoverable exception.
+     */
+    public static boolean recoverable(Throwable t) {
+        // As long as we don't have a general failure handler, we assume that all errors are recoverable.
+        return true;
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
similarity index 81%
rename from modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
rename to modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 835df74c3..311516704 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -15,8 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.table.distributed;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -29,7 +33,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
@@ -45,10 +51,16 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
@@ -82,19 +94,27 @@ import org.apache.ignite.internal.storage.pagememory.PageMemoryStorageEngine;
 import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageChange;
 import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageConfigurationSchema;
 import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
-import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.CliRequests;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
 import org.apache.ignite.schema.SchemaBuilders;
 import org.apache.ignite.schema.definition.ColumnType;
 import org.apache.ignite.schema.definition.TableDefinition;
@@ -157,6 +177,14 @@ public class TableManagerTest extends IgniteAbstractTest {
     @Mock
     MetaStorageManager msm;
 
+    /** Mock messaging service. */
+    @Mock
+    private MessagingService messagingService;
+
+    /** Mock cluster. */
+    @Mock
+    private ClusterService cluster;
+
     /**
      * Revision listener holder. It uses for the test configurations:
      * <ul>
@@ -229,7 +257,9 @@ public class TableManagerTest extends IgniteAbstractTest {
         tblManagerFut.join().beforeNodeStop();
         tblManagerFut.join().stop();
 
-        dsm.stop();
+        if (dsm != null) {
+            dsm.stop();
+        }
 
         sm.stop();
     }
@@ -612,6 +642,112 @@ public class TableManagerTest extends IgniteAbstractTest {
         return tbl2;
     }
 
+    /**
+     * Tests that {@link RaftGroupServiceImpl#changePeersAsync(java.util.List, long)} was retried after some exceptions.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testChangePeersAsyncRetryLogic() throws Exception {
+        RaftMessagesFactory factory = new RaftMessagesFactory();
+
+        List<Peer> nodes = Stream.of(20000, 20001, 20002)
+                .map(port -> new NetworkAddress("localhost", port))
+                .map(Peer::new)
+                .collect(Collectors.toUnmodifiableList());
+
+        int timeout = 1000;
+
+        int delay = 200;
+
+        Peer leader = nodes.get(0);
+
+        when(cluster.messagingService()).thenReturn(messagingService);
+
+        TableManager tableManager = createTableManager(tblManagerFut, false);
+
+        ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME));
+
+        String groupId = "test";
+
+        List<String> shrunkPeers = peersToIds(nodes.subList(0, 1));
+
+        List<String> extendedPeers = peersToIds(nodes);
+
+        AtomicLong firstInvocationOfChangePeersAsync = new AtomicLong(0L);
+
+        AtomicInteger counter = new AtomicInteger(0);
+
+        when(messagingService.invoke(any(NetworkAddress.class),
+                eq(factory.changePeersAsyncRequest()
+                        .newPeersList(shrunkPeers)
+                        .term(1L)
+                        .groupId(groupId).build()), anyLong()))
+                .then(invocation -> {
+                    if (firstInvocationOfChangePeersAsync.get() == 0) {
+                        firstInvocationOfChangePeersAsync.set(System.currentTimeMillis());
+                        return failedFuture(new TimeoutException());
+                    } else {
+                        if (firstInvocationOfChangePeersAsync.get() + timeout < System.currentTimeMillis()) {
+                            //retry happened, new changePeersAsync was called
+                            counter.incrementAndGet();
+
+                            return completedFuture(factory.changePeersAsyncResponse().newPeersList(extendedPeers).build());
+                        }
+                    }
+
+                    return failedFuture(new TimeoutException());
+                });
+
+        when(messagingService.invoke(any(NetworkAddress.class), any(CliRequests.GetLeaderRequest.class), anyLong()))
+                .then(invocation -> {
+                    PeerId leader0 = PeerId.fromPeer(leader);
+
+                    Object resp = leader0 == null
+                            ? factory.errorResponse().errorCode(RaftError.EPERM.getNumber()).build()
+                            : factory.getLeaderResponse().leaderId(leader0.toString()).currentTerm(1L).build();
+
+                    return completedFuture(resp);
+                });
+
+        RaftGroupService service = RaftGroupServiceImpl.start(groupId, cluster, factory, timeout, nodes.subList(0, 2),
+                true, delay, executor).get(3, TimeUnit.SECONDS);
+
+        tableManager.movePartition(() -> service).apply(nodes.subList(0, 1), 1L).join();
+
+        assertEquals(counter.get(), 1);
+
+        AtomicLong secondInvocationOfChangePeersAsync = new AtomicLong(0L);
+
+        when(messagingService.invoke(any(NetworkAddress.class),
+                eq(factory.changePeersAsyncRequest()
+                        .newPeersList(shrunkPeers)
+                        .term(1L)
+                        .groupId(groupId).build()), anyLong()))
+                .then(invocation -> {
+                    if (secondInvocationOfChangePeersAsync.get() == 0) {
+                        secondInvocationOfChangePeersAsync.set(System.currentTimeMillis());
+
+                        return failedFuture(new NullPointerException());
+                    } else {
+                        if (secondInvocationOfChangePeersAsync.get() + timeout < System.currentTimeMillis()) {
+                            //retry happened, new changePeersAsync was called
+                            counter.incrementAndGet();
+
+                            return completedFuture(factory.changePeersAsyncResponse().newPeersList(extendedPeers).build());
+                        }
+                    }
+
+                    return failedFuture(new NullPointerException());
+                });
+
+        tableManager.movePartition(() -> service).apply(nodes.subList(0, 1), 1L).join();
+
+        assertEquals(2, counter.get());
+
+        shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+    }
+
     /**
      * Creates Table manager.
      *