You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2022/06/08 14:56:40 UTC
[ignite-3] branch main updated: IGNITE-17013 added movePartition method to the TableManager with retry logic for exceptional cases (#851)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 073dd6484 IGNITE-17013 added movePartition method to the TableManager with retry logic for exceptional cases (#851)
073dd6484 is described below
commit 073dd6484b3fa1da86c62e84bd87bd5d0c99dc24
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Wed Jun 8 17:56:35 2022 +0300
IGNITE-17013 added movePartition method to the TableManager with retry logic for exceptional cases (#851)
---
.../ignite/internal/util/IgniteSpinBusyLock.java | 2 +-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 2 +-
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 37 +++---
.../raft/jraft/core/RaftGroupServiceTest.java | 11 +-
.../apache/ignite/raft/jraft/test/TestUtils.java | 12 ++
.../internal/table/distributed/TableManager.java | 50 +++++++-
.../raft/RebalanceRaftGroupEventsListener.java | 35 ++---
.../ignite/internal/utils/RebalanceUtil.java | 11 ++
.../table/{ => distributed}/TableManagerTest.java | 142 ++++++++++++++++++++-
9 files changed, 242 insertions(+), 60 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
index e42ebca70..1a807631e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
*
* <p>For example, there may be a manager that have different threads for some purposes and the manager must not be stopped while at least a
* single thread is in "busy" state. In this situation each thread must enter to "busy" state calling method {@link #enterBusy()} in
- * critical pieces of code which, i.e. use grid kernal functionality, notifying that the manager and the whole grid kernal cannot be stopped
+ * critical pieces of code, notifying that the manager and the whole ignite node cannot be stopped
* while it's in progress. Once the activity is done, the thread should leave "busy" state calling method {@link #leaveBusy()}. The manager
* itself, when stopping, should call method {@link #block} that blocks till all activities leave "busy" state.
*
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 80bc52e72..962fbfa90 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -2536,7 +2536,7 @@ public class NodeImpl implements Node, RaftServerService {
}
return;
}
- // Return immediately when the new peers equals to current configuration
+ // Return immediately when the new peers equals to the current configuration
if (this.conf.getConf().equals(newConf)) {
Closure newDone = (Status status) -> {
// doOnNewPeersConfigurationApplied should be called, otherwise we could lose the callback invocation.
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 3d634726e..8ea0d0f0b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -68,7 +69,6 @@ import org.apache.ignite.raft.jraft.rpc.ActionRequest;
import org.apache.ignite.raft.jraft.rpc.ActionResponse;
import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
-import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.jetbrains.annotations.NotNull;
@@ -359,8 +359,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override public CompletableFuture<Void> changePeersAsync(List<Peer> peers, long term) {
Peer leader = this.leader;
- if (leader == null)
+ if (leader == null) {
return refreshLeader().thenCompose(res -> changePeersAsync(peers, term));
+ }
List<String> peersToChange = peers.stream().map(p -> PeerId.fromPeer(p).toString())
.collect(Collectors.toList());
@@ -373,7 +374,17 @@ public class RaftGroupServiceImpl implements RaftGroupService {
sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
- return fut.thenRun(() -> {});
+ return fut.handle((resp, err) -> {
+ // We expect that all raft related errors will be handled by sendWithRetry, means that
+ // such responses will initiate a retrying of the original request.
+ assert !(resp instanceof RpcRequests.ErrorResponse);
+
+ if (err != null) {
+ return CompletableFuture.<Void>failedFuture(err);
+ }
+
+ return CompletableFuture.<Void>completedFuture(null);
+ }).thenCompose(Function.identity());
}
/** {@inheritDoc} */
@@ -645,7 +656,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @param t The throwable.
* @return {@code True} if this is a recoverable exception.
*/
- private boolean recoverable(Throwable t) {
+ private static boolean recoverable(Throwable t) {
if (t instanceof ExecutionException || t instanceof CompletionException) {
t = t.getCause();
}
@@ -705,22 +716,4 @@ public class RaftGroupServiceImpl implements RaftGroupService {
return res;
}
-
- /**
- * Convert list of {@link PeerId} to list of {@link Peer}.
- *
- * @param peers List of {@link PeerId}
- * @return List of {@link Peer}
- */
- private List<Peer> convertPeerIdList(List<PeerId> peers) {
- if (peers == null)
- return Collections.emptyList();
-
- List<Peer> res = new ArrayList<>(peers.size());
-
- for (PeerId peerId: peers)
- res.add(peerFromPeerId(peerId));
-
- return res;
- }
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
index 2650c193a..732bea151 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.core;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -877,16 +878,6 @@ public class RaftGroupServiceTest {
}
- /**
- * Convert list of {@link Peer} to list of string representations.
- *
- * @param peers List of {@link Peer}
- * @return List of string representations.
- */
- private List<String> peersToIds(List<Peer> peers) {
- return peers.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
- }
-
/** */
private static class TestCommand implements WriteCommand {
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
index 786cecbe0..3224fc9e4 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
@@ -26,8 +26,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -215,4 +217,14 @@ public class TestUtils {
return (RpcClientEx) rpcService.getRpcClient();
}
+
+ /**
+ * Convert list of {@link Peer} to list of string representations.
+ *
+ * @param peers List of {@link Peer}
+ * @return List of string representations.
+ */
+ public static List<String> peersToIds(List<Peer> peers) {
+ return peers.stream().map(p -> PeerId.fromPeer(p).toString()).collect(Collectors.toList());
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index eb8522c31..dbe8baecd 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -28,6 +28,7 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_
import static org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber;
import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.recoverable;
import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.utils.RebalanceUtil.updatePendingAssignmentsKeys;
@@ -46,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -468,7 +470,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partitionRaftGroupName(tblId, partId),
partId,
busyLock,
- () -> internalTbl.partitionRaftGroupService(partId),
+ movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
rebalanceScheduler)
).thenAccept(
updatedRaftGroupService -> ((InternalTableImpl) internalTbl)
@@ -1252,7 +1254,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partId,
part,
busyLock,
- () -> tbl.internalTable().partitionRaftGroupService(part),
+ movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)),
rebalanceScheduler);
// Stable assignments from the meta store, which revision is bounded by the current pending event.
@@ -1362,6 +1364,50 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
});
}
+ /**
+ * Performs {@link RaftGroupService#changePeersAsync(java.util.List, long)} on a provided raft group service of a partition, so nodes
+ * of the corresponding raft group can be reconfigured.
+ * Retry mechanism is applied to repeat {@link RaftGroupService#changePeersAsync(java.util.List, long)} if previous one
+ * failed with some exception.
+ *
+ * @param raftGroupServiceSupplier Raft groups service of a partition.
+ * @return Function which performs {@link RaftGroupService#changePeersAsync(java.util.List, long)}.
+ */
+ BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartition(Supplier<RaftGroupService> raftGroupServiceSupplier) {
+ return (List<Peer> peers, Long term) -> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(new NodeStoppingException());
+ }
+ try {
+ return raftGroupServiceSupplier.get().changePeersAsync(peers, term).handleAsync((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(new NodeStoppingException());
+ }
+ try {
+ if (err != null) {
+ if (recoverable(err)) {
+ LOG.warn("Recoverable error received during changePeersAsync invocation, retrying", err);
+ } else {
+ // TODO: Ideally, rebalance, which has initiated this invocation should be canceled,
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17056
+ // TODO: Also it might be reasonable to delegate such exceptional case to a general failure handler.
+ // TODO: At the moment, we repeat such intents as well.
+ LOG.error("Unrecoverable error received during changePeersAsync invocation, retrying", err);
+ }
+ return movePartition(raftGroupServiceSupplier).apply(peers, term);
+ }
+
+ return CompletableFuture.<Void>completedFuture(null);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, rebalanceScheduler).thenCompose(Function.identity());
+ } finally {
+ busyLock.leaveBusy();
+ }
+ };
+ }
+
/**
* Gets a direct accessor for the configuration distributed property.
* If the metadata access only locally configured the method will return local property accessor.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index f625fed47..d8df8b066 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -36,7 +36,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
+import java.util.function.BiFunction;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.configuration.schema.ExtendedTableChange;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -51,7 +51,6 @@ import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
@@ -82,8 +81,8 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
/** Executor for scheduling rebalance retries. */
private final ScheduledExecutorService rebalanceScheduler;
- /** Supplier of client for raft group of rebalance listener. */
- private final Supplier<RaftGroupService> raftGroupServiceSupplier;
+ /** Function that performs a reconfiguration of a raft group of a partition. */
+ private final BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartitionFn;
/** Attempts to retry the current rebalance in case of errors. */
private final AtomicInteger rebalanceAttempts = new AtomicInteger(0);
@@ -109,14 +108,14 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
String partId,
int partNum,
IgniteSpinBusyLock busyLock,
- Supplier<RaftGroupService> raftGroupServiceSupplier,
+ BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartitionFn,
ScheduledExecutorService rebalanceScheduler) {
this.metaStorageMgr = metaStorageMgr;
this.tblConfiguration = tblConfiguration;
this.partId = partId;
this.partNum = partNum;
this.busyLock = busyLock;
- this.raftGroupServiceSupplier = raftGroupServiceSupplier;
+ this.movePartitionFn = movePartitionFn;
this.rebalanceScheduler = rebalanceScheduler;
}
@@ -136,18 +135,15 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
try {
rebalanceAttempts.set(0);
- metaStorageMgr.get(pendingPartAssignmentsKey(partId))
- .thenCompose(pendingEntry -> {
- if (!pendingEntry.empty()) {
- List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
+ Entry pendingEntry = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get();
- return raftGroupServiceSupplier.get().changePeersAsync(clusterNodesToPeers(pendingNodes), term);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).get();
+ if (!pendingEntry.empty()) {
+ List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
+
+ movePartitionFn.apply(clusterNodesToPeers(pendingNodes), term).join();
+ }
} catch (InterruptedException | ExecutionException e) {
- // TODO: IGNITE-17013 errors during this call should be handled by retry logic
+ // TODO: IGNITE-14693
LOG.error("Couldn't start rebalance for partition {} of table {} on new elected leader for term {}",
e, partNum, tblConfiguration.name().value(), term);
} finally {
@@ -232,10 +228,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
LOG.info("Started {} attempt to retry the current rebalance for the partId = {}.", rebalanceAttempts.get(), partId);
try {
- raftGroupServiceSupplier.get().changePeersAsync(peerIdsToPeers(peers), term).get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO: IGNITE-17013 errors during this call should be handled by retry logic
- LOG.error("Error during the rebalance retry for the partId = {}", e, partId);
+ movePartitionFn.apply(peerIdsToPeers(peers), term).join();
} finally {
busyLock.leaveBusy();
}
@@ -290,7 +283,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
rebalanceAttempts.set(0);
} catch (InterruptedException | ExecutionException e) {
- // TODO: IGNITE-17013 errors during this call should be handled by retry logic
+ // TODO: IGNITE-14693
LOG.error("Couldn't commit new partition configuration to metastore for table = {}, partition = {}",
e, tblConfiguration.name(), partNum);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 90caa2bdd..47e253746 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -169,4 +169,15 @@ public class RebalanceUtil {
return Integer.parseInt(strKey.substring(strKey.indexOf("_part_") + "_part_".length()));
}
+
+ /**
+ * Checks if an error is recoverable, so we can retry a rebalance intent.
+ *
+ * @param t The throwable.
+ * @return {@code True} if this is a recoverable exception.
+ */
+ public static boolean recoverable(Throwable t) {
+ // As long as we don't have a general failure handler, we assume that all errors are recoverable.
+ return true;
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
similarity index 81%
rename from modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
rename to modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 835df74c3..311516704 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -15,8 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.table.distributed;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -29,7 +33,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
@@ -45,10 +51,16 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
@@ -82,19 +94,27 @@ import org.apache.ignite.internal.storage.pagememory.PageMemoryStorageEngine;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageChange;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageConfigurationSchema;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
-import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.CliRequests;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.schema.definition.TableDefinition;
@@ -157,6 +177,14 @@ public class TableManagerTest extends IgniteAbstractTest {
@Mock
MetaStorageManager msm;
+ /** Mock messaging service. */
+ @Mock
+ private MessagingService messagingService;
+
+ /** Mock cluster. */
+ @Mock
+ private ClusterService cluster;
+
/**
* Revision listener holder. It uses for the test configurations:
* <ul>
@@ -229,7 +257,9 @@ public class TableManagerTest extends IgniteAbstractTest {
tblManagerFut.join().beforeNodeStop();
tblManagerFut.join().stop();
- dsm.stop();
+ if (dsm != null) {
+ dsm.stop();
+ }
sm.stop();
}
@@ -612,6 +642,112 @@ public class TableManagerTest extends IgniteAbstractTest {
return tbl2;
}
+ /**
+ * Tests that {@link RaftGroupServiceImpl#changePeersAsync(java.util.List, long)} was retried after some exceptions.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testChangePeersAsyncRetryLogic() throws Exception {
+ RaftMessagesFactory factory = new RaftMessagesFactory();
+
+ List<Peer> nodes = Stream.of(20000, 20001, 20002)
+ .map(port -> new NetworkAddress("localhost", port))
+ .map(Peer::new)
+ .collect(Collectors.toUnmodifiableList());
+
+ int timeout = 1000;
+
+ int delay = 200;
+
+ Peer leader = nodes.get(0);
+
+ when(cluster.messagingService()).thenReturn(messagingService);
+
+ TableManager tableManager = createTableManager(tblManagerFut, false);
+
+ ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME));
+
+ String groupId = "test";
+
+ List<String> shrunkPeers = peersToIds(nodes.subList(0, 1));
+
+ List<String> extendedPeers = peersToIds(nodes);
+
+ AtomicLong firstInvocationOfChangePeersAsync = new AtomicLong(0L);
+
+ AtomicInteger counter = new AtomicInteger(0);
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(factory.changePeersAsyncRequest()
+ .newPeersList(shrunkPeers)
+ .term(1L)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation -> {
+ if (firstInvocationOfChangePeersAsync.get() == 0) {
+ firstInvocationOfChangePeersAsync.set(System.currentTimeMillis());
+ return failedFuture(new TimeoutException());
+ } else {
+ if (firstInvocationOfChangePeersAsync.get() + timeout < System.currentTimeMillis()) {
+ //retry happened, new changePeersAsync was called
+ counter.incrementAndGet();
+
+ return completedFuture(factory.changePeersAsyncResponse().newPeersList(extendedPeers).build());
+ }
+ }
+
+ return failedFuture(new TimeoutException());
+ });
+
+ when(messagingService.invoke(any(NetworkAddress.class), any(CliRequests.GetLeaderRequest.class), anyLong()))
+ .then(invocation -> {
+ PeerId leader0 = PeerId.fromPeer(leader);
+
+ Object resp = leader0 == null
+ ? factory.errorResponse().errorCode(RaftError.EPERM.getNumber()).build()
+ : factory.getLeaderResponse().leaderId(leader0.toString()).currentTerm(1L).build();
+
+ return completedFuture(resp);
+ });
+
+ RaftGroupService service = RaftGroupServiceImpl.start(groupId, cluster, factory, timeout, nodes.subList(0, 2),
+ true, delay, executor).get(3, TimeUnit.SECONDS);
+
+ tableManager.movePartition(() -> service).apply(nodes.subList(0, 1), 1L).join();
+
+ assertEquals(counter.get(), 1);
+
+ AtomicLong secondInvocationOfChangePeersAsync = new AtomicLong(0L);
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(factory.changePeersAsyncRequest()
+ .newPeersList(shrunkPeers)
+ .term(1L)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation -> {
+ if (secondInvocationOfChangePeersAsync.get() == 0) {
+ secondInvocationOfChangePeersAsync.set(System.currentTimeMillis());
+
+ return failedFuture(new NullPointerException());
+ } else {
+ if (secondInvocationOfChangePeersAsync.get() + timeout < System.currentTimeMillis()) {
+ //retry happened, new changePeersAsync was called
+ counter.incrementAndGet();
+
+ return completedFuture(factory.changePeersAsyncResponse().newPeersList(extendedPeers).build());
+ }
+ }
+
+ return failedFuture(new NullPointerException());
+ });
+
+ tableManager.movePartition(() -> service).apply(nodes.subList(0, 1), 1L).join();
+
+ assertEquals(2, counter.get());
+
+ shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+ }
+
/**
* Creates Table manager.
*