You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/30 14:43:43 UTC
[ignite-3] branch main updated: IGNITE-17196 In-memory raft group reconfiguration on node failure (#1016)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 467f36afb1 IGNITE-17196 In-memory raft group reconfiguration on node failure (#1016)
467f36afb1 is described below
commit 467f36afb16d42ad8340ebe5e702d4f5cb999d7e
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Aug 30 18:43:39 2022 +0400
IGNITE-17196 In-memory raft group reconfiguration on node failure (#1016)
---
.../ignite/internal/affinity/AffinityUtils.java | 57 +++-
.../affinity/RendezvousAffinityFunction.java | 52 ++-
.../org/apache/ignite/network/ClusterNode.java | 5 +-
.../org/apache/ignite/network/NetworkAddress.java | 5 +-
.../internal/compute/ComputeMessageTypes.java | 2 +-
.../ignite/internal/thread/NamedThreadFactory.java | 12 +
.../org/apache/ignite/internal/util/ByteUtils.java | 4 +-
.../client/ItMetaStorageRaftGroupTest.java | 2 +-
.../ItMetaStorageServicePersistenceTest.java | 2 +-
.../client/ItMetaStorageServiceTest.java | 4 +-
.../metastorage/client/MetaStorageServiceImpl.java | 10 +-
.../metastorage/client/SimpleCondition.java | 12 +
.../internal/metastorage/MetaStorageManager.java | 2 +-
.../metastorage/MetaStorageRangeCursorTest.java | 2 +-
.../processor/messages/MessageImplGenerator.java | 13 +
.../ignite/internal/network/netty/NettyUtils.java | 6 +-
.../java/org/apache/ignite/internal/raft/Loza.java | 20 ++
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 53 ++-
.../rpc/impl/cli/IgniteCliRpcRequestClosure.java | 6 +-
.../storage/ItRebalanceDistributedTest.java | 7 +-
.../app/ItIgniteInMemoryNodeRestartTest.java | 354 +++++++++++++++++++++
.../runner/app/ItIgniteNodeRestartTest.java | 3 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +
.../sql/engine/exec/MockedStructuresTest.java | 4 +
modules/table/pom.xml | 10 +
.../internal/table/distributed/TableManager.java | 265 ++++++++++++---
.../raft/RebalanceRaftGroupEventsListener.java | 297 ++++++++++++-----
.../internal/table/message/HasDataRequest.java} | 26 +-
.../internal/table/message/HasDataResponse.java} | 24 +-
.../internal/table/message/TableMessageGroup.java} | 22 +-
.../ignite/internal/utils/RebalanceUtil.java | 278 +++++++++++++++-
.../table/distributed/TableManagerTest.java | 10 +-
.../raft/RebalanceRaftGroupEventsListenerTest.java | 2 +-
33 files changed, 1354 insertions(+), 225 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
index 557d512124..58dbbc4eeb 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
@@ -17,8 +17,12 @@
package org.apache.ignite.internal.affinity;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.function.IntFunction;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
@@ -30,7 +34,31 @@ public class AffinityUtils {
* Calculates affinity assignments.
*
* @param partitions Partitions count.
- * @param replicas Replicas count.
+ * @param replicas Replicas count.
+ * @param aggregator Function that creates a collection for the partition assignments.
+ * @return List nodes by partition.
+ */
+ public static <T extends Collection<ClusterNode>> List<T> calculateAssignments(
+ @NotNull Collection<ClusterNode> baselineNodes,
+ int partitions,
+ int replicas,
+ IntFunction<T> aggregator
+ ) {
+ return RendezvousAffinityFunction.assignPartitions(
+ baselineNodes,
+ partitions,
+ replicas,
+ false,
+ null,
+ aggregator
+ );
+ }
+
+ /**
+ * Calculates affinity assignments.
+ *
+ * @param partitions Partitions count.
+ * @param replicas Replicas count.
* @return List nodes by partition.
*/
public static List<List<ClusterNode>> calculateAssignments(
@@ -38,12 +66,35 @@ public class AffinityUtils {
int partitions,
int replicas
) {
- return RendezvousAffinityFunction.assignPartitions(
+ return calculateAssignments(
baselineNodes,
partitions,
replicas,
+ ArrayList::new
+ );
+ }
+
+ /**
+ * Calculates affinity assignments for single partition.
+ *
+ * @param baselineNodes Nodes.
+ * @param partition Partition id.
+ * @param replicas Replicas count.
+ * @return List of nodes.
+ */
+ public static Set<ClusterNode> calculateAssignmentForPartition(
+ Collection<ClusterNode> baselineNodes,
+ int partition,
+ int replicas
+ ) {
+ return RendezvousAffinityFunction.assignPartition(
+ partition,
+ new ArrayList<>(baselineNodes),
+ replicas,
+ null,
false,
- null
+ null,
+ HashSet::new
);
}
}
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
index fe16b07b0f..0a60caa22a 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.BiPredicate;
+import java.util.function.IntFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -85,18 +86,24 @@ public class RendezvousAffinityFunction {
* @param neighborhoodCache Neighborhood.
* @param exclNeighbors If true neighbors are excluded, false otherwise.
* @param nodeFilter Filter for nodes.
+ * @param aggregator Function that creates a collection for the partition assignments.
* @return Assignment.
*/
- public static List<ClusterNode> assignPartition(
+ public static <T extends Collection<ClusterNode>> T assignPartition(
int part,
List<ClusterNode> nodes,
int replicas,
Map<String, Collection<ClusterNode>> neighborhoodCache,
boolean exclNeighbors,
- BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+ BiPredicate<ClusterNode, T> nodeFilter,
+ IntFunction<T> aggregator
) {
if (nodes.size() <= 1) {
- return nodes;
+ T res = aggregator.apply(1);
+
+ res.addAll(nodes);
+
+ return res;
}
IgniteBiTuple<Long, ClusterNode>[] hashArr =
@@ -118,12 +125,12 @@ public class RendezvousAffinityFunction {
// REPLICATED cache case
if (replicas == Integer.MAX_VALUE) {
- return replicatedAssign(nodes, sortedNodes);
+ return replicatedAssign(nodes, sortedNodes, aggregator);
}
Iterator<ClusterNode> it = sortedNodes.iterator();
- List<ClusterNode> res = new ArrayList<>(effectiveReplicas);
+ T res = aggregator.apply(effectiveReplicas);
Collection<ClusterNode> allNeighbors = new HashSet<>();
@@ -188,13 +195,14 @@ public class RendezvousAffinityFunction {
*
* @param nodes Topology.
* @param sortedNodes Sorted for specified partitions nodes.
+ * @param aggregator Function that creates a collection for the partition assignments.
* @return Assignment.
*/
- private static List<ClusterNode> replicatedAssign(List<ClusterNode> nodes,
- Iterable<ClusterNode> sortedNodes) {
+ private static <T extends Collection<ClusterNode>> T replicatedAssign(List<ClusterNode> nodes,
+ Iterable<ClusterNode> sortedNodes, IntFunction<T> aggregator) {
ClusterNode first = sortedNodes.iterator().next();
- List<ClusterNode> res = new ArrayList<>(nodes.size());
+ T res = aggregator.apply(nodes.size());
res.add(first);
@@ -238,7 +246,7 @@ public class RendezvousAffinityFunction {
* @param currentTopologySnapshot List of topology nodes.
* @param partitions Number of table partitions.
* @param replicas Number partition replicas.
- * @param exclNeighbors If true neighbors are excluded fro the one partition assignment, false otherwise.
+ * @param exclNeighbors If true neighbors are excluded from the one partition assignment, false otherwise.
* @param nodeFilter Filter for nodes.
* @return List nodes by partition.
*/
@@ -248,19 +256,41 @@ public class RendezvousAffinityFunction {
int replicas,
boolean exclNeighbors,
BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+ ) {
+ return assignPartitions(currentTopologySnapshot, partitions, replicas, exclNeighbors, nodeFilter, ArrayList::new);
+ }
+
+ /**
+ * Generates an assignment by the given parameters.
+ *
+ * @param currentTopologySnapshot List of topology nodes.
+ * @param partitions Number of table partitions.
+ * @param replicas Number partition replicas.
+ * @param exclNeighbors If true neighbors are excluded from the one partition assignment, false otherwise.
+ * @param nodeFilter Filter for nodes.
+ * @param aggregator Function that creates a collection for the partition assignments.
+ * @return List nodes by partition.
+ */
+ public static <T extends Collection<ClusterNode>> List<T> assignPartitions(
+ Collection<ClusterNode> currentTopologySnapshot,
+ int partitions,
+ int replicas,
+ boolean exclNeighbors,
+ BiPredicate<ClusterNode, T> nodeFilter,
+ IntFunction<T> aggregator
) {
assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + MAX_PARTITIONS_COUNT;
assert partitions > 0 : "parts > 0";
assert replicas > 0 : "replicas > 0";
- List<List<ClusterNode>> assignments = new ArrayList<>(partitions);
+ List<T> assignments = new ArrayList<>(partitions);
Map<String, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? neighbors(currentTopologySnapshot) : null;
List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot);
for (int i = 0; i < partitions; i++) {
- List<ClusterNode> partAssignment = assignPartition(i, nodes, replicas, neighborhoodCache, exclNeighbors, nodeFilter);
+ T partAssignment = assignPartition(i, nodes, replicas, neighborhoodCache, exclNeighbors, nodeFilter, aggregator);
assignments.add(partAssignment);
}
diff --git a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
index e267d292e4..f9e8088602 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -18,7 +18,6 @@
package org.apache.ignite.network;
import java.io.Serializable;
-import java.util.Objects;
import org.apache.ignite.internal.tostring.S;
/**
@@ -90,7 +89,9 @@ public class ClusterNode implements Serializable {
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(name, address);
+ int result = name.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
}
/** {@inheritDoc} */
diff --git a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
index 5ca50da82c..fdeff7ac8a 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
@@ -149,7 +149,10 @@ public class NetworkAddress implements Serializable {
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(host, port, consistentId);
+ int result = host.hashCode();
+ result = 31 * result + port;
+ result = 31 * result + (consistentId != null ? consistentId.hashCode() : 0);
+ return result;
}
/** {@inheritDoc} */
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
index 8f684ba0c2..1e5a4f54ce 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
@@ -24,7 +24,7 @@ import org.apache.ignite.network.annotations.MessageGroup;
/**
* Message types for the Compute module.
*/
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
+@MessageGroup(groupType = 6, groupName = "ComputeMessages")
public class ComputeMessageTypes {
/**
* Type for {@link ExecuteRequest}.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
index ec9869cc4f..20b065717a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
@@ -91,4 +91,16 @@ public class NamedThreadFactory implements ThreadFactory {
public static String threadPrefix(String nodeName, String poolName) {
return IgniteThread.threadPrefix(nodeName, poolName);
}
+
+ /**
+ * Creates a thread factory based on a node's name and a name of the pool.
+ *
+ * @param nodeName Node name.
+ * @param poolName Pool name.
+ * @param logger Logger.
+ * @return Thread factory.
+ */
+ public static NamedThreadFactory create(String nodeName, String poolName, IgniteLogger logger) {
+ return new NamedThreadFactory(threadPrefix(nodeName, poolName), logger);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index abcf06e3c3..180aca3a5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -123,12 +123,12 @@ public class ByteUtils {
* @param bytes Byte array.
* @return Object.
*/
- public static Object fromBytes(byte[] bytes) {
+ public static <T> T fromBytes(byte[] bytes) {
try (
var bis = new ByteArrayInputStream(bytes);
var in = new ObjectInputStream(bis)
) {
- return in.readObject();
+ return (T) in.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new IgniteInternalException("Could not deserialize an object", e);
}
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index 03ff8611a8..566db7580e 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -259,7 +259,7 @@ public class ItMetaStorageRaftGroupTest {
.findFirst().get().value;
MetaStorageService metaStorageSvc = new MetaStorageServiceImpl(
- raftGroupServiceOfLiveServer, "some_node");
+ raftGroupServiceOfLiveServer, "some_node", "some_node");
Cursor<Entry> cursor = metaStorageSvc.range(EXPECTED_RESULT_ENTRY1.key(), new ByteArray(new byte[]{4}));
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
index 5b2781c5f7..57972b0e6d 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
@@ -65,7 +65,7 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps
/** {@inheritDoc} */
@Override
public void beforeFollowerStop(RaftGroupService service) throws Exception {
- metaStorage = new MetaStorageServiceImpl(service, null);
+ metaStorage = new MetaStorageServiceImpl(service, null, null);
// Put some data in the metastorage
metaStorage.put(FIRST_KEY, FIRST_VALUE).get();
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index fcd9dae271..ffc2c383b6 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -923,7 +923,7 @@ public class ItMetaStorageServiceTest {
).get(3, TimeUnit.SECONDS);
try {
- MetaStorageService metaStorageSvc2 = new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_1);
+ MetaStorageService metaStorageSvc2 = new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_1, NODE_ID_1);
Cursor<Entry> cursorNode0 = metaStorageSvc.range(EXPECTED_RESULT_ENTRY.key(), null);
@@ -1078,6 +1078,6 @@ public class ItMetaStorageServiceTest {
executor
).get(3, TimeUnit.SECONDS);
- return new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_0);
+ return new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_0, NODE_ID_0);
}
}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index eb2895c7f3..d2102beabc 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -91,16 +91,21 @@ public class MetaStorageServiceImpl implements MetaStorageService {
/** Local node id. */
private final String localNodeId;
+ /** Local node name. */
+ private final String localNodeName;
+
/**
* Constructor.
*
* @param metaStorageRaftGrpSvc Meta storage raft group service.
* @param localNodeId Local node id.
+ * @param localNodeName Local node name.
*/
- public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, String localNodeId) {
+ public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, String localNodeId, String localNodeName) {
this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc;
this.watchProcessor = new WatchProcessor();
this.localNodeId = localNodeId;
+ this.localNodeName = localNodeName;
}
/** {@inheritDoc} */
@@ -533,6 +538,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
* @param lsnr The listener which receives and handles watch updates.
*/
Watcher(Cursor<WatchEvent> cursor, WatchListener lsnr) {
+ setName("ms-watcher-" + localNodeName);
this.cursor = cursor;
this.lsnr = lsnr;
}
@@ -554,6 +560,8 @@ public class MetaStorageServiceImpl implements MetaStorageService {
watchEvt = watchEvtsIter.next();
} catch (Throwable e) {
lsnr.onError(e);
+
+ throw e;
}
assert watchEvt != null;
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java
index 6382a0daee..36be2553a9 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java
@@ -77,6 +77,8 @@ public final class SimpleCondition implements Condition {
* @throws IllegalStateException In the case when the condition is already defined.
*/
public SimpleCondition eq(long rev) {
+ assert rev > 0 : "Revision must be positive.";
+
validate(type());
type(ConditionType.REV_EQUAL);
@@ -95,6 +97,8 @@ public final class SimpleCondition implements Condition {
* @throws IllegalStateException In the case when the condition is already defined.
*/
public SimpleCondition ne(long rev) {
+ assert rev > 0 : "Revision must be positive.";
+
validate(type());
type(ConditionType.REV_NOT_EQUAL);
@@ -113,6 +117,8 @@ public final class SimpleCondition implements Condition {
* @throws IllegalStateException In the case when the condition is already defined.
*/
public SimpleCondition gt(long rev) {
+ assert rev > 0 : "Revision must be positive.";
+
validate(type());
type(ConditionType.REV_GREATER);
@@ -131,6 +137,8 @@ public final class SimpleCondition implements Condition {
* @throws IllegalStateException In the case when the condition is already defined.
*/
public SimpleCondition ge(long rev) {
+ assert rev > 0 : "Revision must be positive.";
+
validate(type());
type(ConditionType.REV_GREATER_OR_EQUAL);
@@ -149,6 +157,8 @@ public final class SimpleCondition implements Condition {
* @throws IllegalStateException In the case when the condition is already defined.
*/
public SimpleCondition lt(long rev) {
+ assert rev > 0 : "Revision must be positive.";
+
validate(type());
type(ConditionType.REV_LESS);
@@ -167,6 +177,8 @@ public final class SimpleCondition implements Condition {
* @throws IllegalStateException In the case when the condition is already defined.
*/
public SimpleCondition le(long rev) {
+ assert rev > 0 : "Revision must be positive.";
+
validate(type());
type(ConditionType.REV_LESS_OR_EQUAL);
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 542b3e544c..6284048e92 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -185,7 +185,7 @@ public class MetaStorageManager implements IgniteComponent {
RaftGroupOptions.defaults()
);
- return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id()));
+ return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
} catch (NodeStoppingException e) {
return CompletableFuture.failedFuture(e);
}
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java
index 44d274bdfd..fb4176bf48 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java
@@ -84,7 +84,7 @@ public class MetaStorageRangeCursorTest {
when(raftGroupService.run(any())).thenAnswer(invocation -> runCommand(invocation.getArgument(0)));
- MetaStorageService metaStorageService = new MetaStorageServiceImpl(raftGroupService, "test");
+ MetaStorageService metaStorageService = new MetaStorageServiceImpl(raftGroupService, "test", "test");
checkCursor(metaStorageService.range(intToBytes(0), intToBytes(keyTo)), limit);
checkCursor(metaStorageService.range(intToBytes(0), intToBytes(keyTo), 0), limit);
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index a8393dc84c..f5b28328fe 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -49,6 +49,8 @@ import javax.tools.Diagnostic;
import org.apache.ignite.internal.network.processor.MessageClass;
import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Marshallable;
@@ -105,6 +107,7 @@ public class MessageImplGenerator {
String getterName = getter.getSimpleName().toString();
FieldSpec.Builder fieldBuilder = FieldSpec.builder(getterReturnType, getterName)
+ .addAnnotation(IgniteToStringInclude.class)
.addModifiers(Modifier.PRIVATE);
boolean isMarshallable = getter.getAnnotation(Marshallable.class) != null;
@@ -163,6 +166,16 @@ public class MessageImplGenerator {
messageImpl.addMethod(groupTypeMethod);
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17591
+ MethodSpec toStringMethod = MethodSpec.methodBuilder("toString")
+ .addAnnotation(Override.class)
+ .addModifiers(Modifier.PUBLIC)
+ .returns(String.class)
+ .addStatement("return $T.toString($T.class, this)", S.class, messageImplClassName)
+ .build();
+
+ messageImpl.addMethod(toStringMethod);
+
// message type constant and getter
FieldSpec messageTypeField = FieldSpec.builder(short.class, "TYPE")
.addModifiers(Modifier.PUBLIC, Modifier.STATIC, Modifier.FINAL)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
index e8709c84b4..53747ef562 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
@@ -22,6 +22,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.jetbrains.annotations.Async.Execute;
+import org.jetbrains.annotations.Async.Schedule;
/**
* Netty utilities.
@@ -38,12 +40,12 @@ public class NettyUtils {
* @return CompletableFuture.
*/
public static <T, R, F extends Future<R>> CompletableFuture<T> toCompletableFuture(
- F nettyFuture,
+ @Schedule F nettyFuture,
Function<F, T> mapper
) {
var fut = new CompletableFuture<T>();
- nettyFuture.addListener((F future) -> {
+ nettyFuture.addListener((@Execute F future) -> {
if (future.isSuccess()) {
fut.complete(mapper.apply(future));
} else if (future.isCancelled()) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index eb5f0c325d..3fec3bcdaf 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -43,6 +43,8 @@ import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
@@ -325,6 +327,24 @@ public class Loza implements IgniteComponent {
}
}
+ /**
+ * Returns messaging service.
+ *
+ * @return Messaging service.
+ */
+ public MessagingService messagingService() {
+ return clusterNetSvc.messagingService();
+ }
+
+ /**
+ * Returns topology service.
+ *
+ * @return Topology service.
+ */
+ public TopologyService topologyService() {
+ return clusterNetSvc.topologyService();
+ }
+
/**
* Returns a cluster service.
*
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index fc9ed2b9dd..880c91a612 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
@@ -582,7 +583,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
LOG.warn("Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ",
err, req.getClass().getSimpleName());
- sendWithRetry(randomNode(), req, stopTime, fut);
+ sendWithRetry(randomNode(peer), req, stopTime, fut);
return null;
}, retryDelay, TimeUnit.MILLISECONDS);
@@ -603,7 +604,17 @@ public class RaftGroupServiceImpl implements RaftGroupService {
resp0.errorCode() == (RaftError.EAGAIN.getNumber()) ||
resp0.errorCode() == (RaftError.ENOENT.getNumber())) { // Possibly a node has not been started.
executor.schedule(() -> {
- sendWithRetry(peer, req, stopTime, fut);
+ Peer targetPeer = peer;
+
+ if (resp0.errorCode() == RaftError.ENOENT.getNumber()) {
+ // If changing peers or requesting a leader and something is not found
+ // probably target peer is doing rebalancing, try another peer.
+ if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+ targetPeer = randomNode(peer);
+ }
+ }
+
+ sendWithRetry(targetPeer, req, stopTime, fut);
return null;
}, retryDelay, TimeUnit.MILLISECONDS);
@@ -614,7 +625,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
resp0.errorCode() == RaftError.EINTERNAL.getNumber()) {
if (resp0.leaderId() == null) {
executor.schedule(() -> {
- sendWithRetry(randomNode(), req, stopTime, fut);
+ sendWithRetry(randomNode(peer), req, stopTime, fut);
return null;
}, retryDelay, TimeUnit.MILLISECONDS);
@@ -679,15 +690,45 @@ public class RaftGroupServiceImpl implements RaftGroupService {
return t instanceof TimeoutException || t instanceof IOException;
}
+ private Peer randomNode() {
+ return randomNode(null);
+ }
+
/**
- * @return Random node.
+ * Returns a random peer. Tries 5 times finding a peer different from the excluded peer.
+ * If excluded peer is null, just returns a random peer.
+ *
+ * @param excludedPeer Excluded peer.
+ * @return Random peer.
*/
- private Peer randomNode() {
+ private Peer randomNode(Peer excludedPeer) {
List<Peer> peers0 = peers;
assert peers0 != null && !peers0.isEmpty();
- return peers0.get(current().nextInt(peers0.size()));
+ int lastPeerIndex = -1;
+
+ if (excludedPeer != null) {
+ lastPeerIndex = peers0.indexOf(excludedPeer);
+ }
+
+ int retries = 0;
+
+ ThreadLocalRandom random = current();
+
+ int newIdx = 0;
+
+ while (retries < 5) {
+ newIdx = random.nextInt(peers0.size());
+
+ if (newIdx != lastPeerIndex) {
+ break;
+ }
+
+ retries++;
+ }
+
+ return peers0.get(newIdx);
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
index 42f8c2a883..dd8a1acc65 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
@@ -74,9 +74,9 @@ public class IgniteCliRpcRequestClosure implements Closure {
if (err.errorCode() == RaftError.EPERM.getNumber()) {
newLeader = node.getLeaderId();
- delegate.sendResponse(
- RaftRpcFactory.DEFAULT
- .newResponse(newLeader.toString(), getMsgFactory(), RaftError.EPERM, err.errorMsg()));
+ String leaderId = newLeader != null ? newLeader.toString() : null;
+
+ delegate.sendResponse(RaftRpcFactory.DEFAULT.newResponse(leaderId, getMsgFactory(), RaftError.EPERM, err.errorMsg()));
return;
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index b8afff7126..e064f0207a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -335,7 +335,7 @@ public class ItRebalanceDistributedTest {
return nodes.stream().filter(n -> n.address().equals(addr)).findFirst().get();
}
- private List<ClusterNode> getPartitionClusterNodes(int nodeNum, int partNum) {
+ private Set<ClusterNode> getPartitionClusterNodes(int nodeNum, int partNum) {
var table = ((ExtendedTableConfiguration) nodes.get(nodeNum).clusterCfgMgr.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY).tables().get("PUBLIC.TBL1"));
@@ -343,11 +343,11 @@ public class ItRebalanceDistributedTest {
var assignments = table.assignments().value();
if (assignments != null) {
- return ((List<List<ClusterNode>>) ByteUtils.fromBytes(assignments)).get(partNum);
+ return ((List<Set<ClusterNode>>) ByteUtils.fromBytes(assignments)).get(partNum);
}
}
- return List.of();
+ return Set.of();
}
private static class Node {
@@ -480,6 +480,7 @@ public class ItRebalanceDistributedTest {
schemaManager = new SchemaManager(registry, tablesCfg);
tableManager = new TableManager(
+ name,
registry,
tablesCfg,
raftManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
new file mode 100644
index 0000000000..41179fe67b
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageChange;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * These tests check in-memory node restart scenarios.
+ */
+public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
+ /** Default node port. */
+ private static final int DEFAULT_NODE_PORT = 3344;
+
+ /** Value producer for table data, is used to create data and check it later. */
+ private static final IntFunction<String> VALUE_PRODUCER = i -> "val " + i;
+
+ /** Prefix for full table name. */
+ private static final String SCHEMA_PREFIX = "PUBLIC.";
+
+ /** Test table name. */
+ private static final String TABLE_NAME = "Table1";
+
+ /** Nodes bootstrap configuration pattern. */
+ private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " network.port: {},\n"
+ + " network.nodeFinder.netClusterNodes: {}\n"
+ + "}";
+
+ /** Cluster nodes. */
+ private static final List<Ignite> CLUSTER_NODES = new ArrayList<>();
+
+ private static final List<String> CLUSTER_NODES_NAMES = new ArrayList<>();
+
+ /**
+ * Stops all started nodes.
+ */
+ @AfterEach
+ public void afterEach() throws Exception {
+ var closeables = new ArrayList<AutoCloseable>();
+
+ for (String name : CLUSTER_NODES_NAMES) {
+ if (name != null) {
+ closeables.add(() -> IgnitionManager.stop(name));
+ }
+ }
+
+ IgniteUtils.closeAll(closeables);
+
+ CLUSTER_NODES.clear();
+ CLUSTER_NODES_NAMES.clear();
+ }
+
+ /**
+ * Start node with the given parameters.
+ *
+ * @param idx Node index, is used to stop the node later, see {@link #stopNode(int)}.
+ * @param nodeName Node name.
+ * @param cfgString Configuration string.
+ * @param workDir Working directory.
+ * @return Created node instance.
+ */
+ private static IgniteImpl startNode(int idx, String nodeName, @Nullable String cfgString, Path workDir) {
+ assertTrue(CLUSTER_NODES.size() == idx || CLUSTER_NODES.get(idx) == null);
+
+ CLUSTER_NODES_NAMES.add(idx, nodeName);
+
+ CompletableFuture<Ignite> future = IgnitionManager.start(nodeName, cfgString, workDir.resolve(nodeName));
+
+ if (CLUSTER_NODES.isEmpty()) {
+ IgnitionManager.init(nodeName, List.of(nodeName), "cluster");
+ }
+
+ assertThat(future, willCompleteSuccessfully());
+
+ Ignite ignite = future.join();
+
+ CLUSTER_NODES.add(idx, ignite);
+
+ return (IgniteImpl) ignite;
+ }
+
+ /**
+ * Start node with the given parameters.
+ *
+ * @param testInfo Test info.
+ * @param idx Node index, is used to stop the node later, see {@link #stopNode(int)}.
+ * @return Created node instance.
+ */
+ private IgniteImpl startNode(TestInfo testInfo, int idx) {
+ int port = DEFAULT_NODE_PORT + idx;
+ String nodeName = testNodeName(testInfo, port);
+ String cfgString = configurationString(idx);
+
+ return startNode(idx, nodeName, cfgString, workDir.resolve(nodeName));
+ }
+
+ /**
+ * Build a configuration string.
+ *
+ * @param idx Node index.
+ * @return Configuration string.
+ */
+ private static String configurationString(int idx) {
+ int port = DEFAULT_NODE_PORT + idx;
+
+ // The address of the first node.
+ @Language("HOCON") String connectAddr = "[localhost\":\"" + DEFAULT_NODE_PORT + "]";
+
+ return IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, port, connectAddr);
+ }
+
+ /**
+ * Stop the node with given index.
+ *
+ * @param idx Node index.
+ */
+ private static void stopNode(int idx) {
+ Ignite node = CLUSTER_NODES.get(idx);
+
+ if (node != null) {
+ IgnitionManager.stop(node.name());
+
+ CLUSTER_NODES.set(idx, null);
+ CLUSTER_NODES_NAMES.set(idx, null);
+ }
+ }
+
+ /**
+ * Restarts an in-memory node that is not a leader of the table's partition.
+ */
+ @Test
+ public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws Exception {
+ // Start three nodes, the first one is going to be CMG and MetaStorage leader.
+ IgniteImpl ignite = startNode(testInfo, 0);
+ startNode(testInfo, 1);
+ startNode(testInfo, 2);
+
+ // Create a table with replica on every node.
+ createTableWithData(ignite, TABLE_NAME, 3, 1);
+
+ String tableName = SCHEMA_PREFIX + TABLE_NAME;
+
+ TableImpl table = (TableImpl) ignite.tables().table(tableName);
+ String tableId = table.tableId().toString();
+
+ // Find the leader of the table's partition group.
+ RaftGroupService raftGroupService = table.internalTable().partitionRaftGroupService(0);
+ IgniteBiTuple<Peer, Long> leaderWithTerm = raftGroupService.refreshAndGetLeaderWithTerm().join();
+ NetworkAddress leaderAddress = leaderWithTerm.get1().address();
+
+ // Find the index of any node that is not a leader of the partition group.
+ int idxToStop = IntStream.range(1, 3)
+ .filter(idx -> !leaderAddress.equals(ignite(idx).node().address()))
+ .findFirst().getAsInt();
+
+ // Restart the node.
+ stopNode(idxToStop);
+
+ IgniteImpl restartingNode = startNode(testInfo, idxToStop);
+
+ Loza loza = restartingNode.raftManager();
+
+ // Check that it restarts.
+ assertTrue(IgniteTestUtils.waitForCondition(
+ () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+ TimeUnit.SECONDS.toMillis(10)
+ ));
+
+ // Check the data rebalanced correctly.
+ checkTableWithData(restartingNode, TABLE_NAME);
+ }
+
+ /**
+ * Restarts multiple nodes so the majority is lost.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17586")
+ @Test
+ public void inMemoryNodeRestartNoMajority(TestInfo testInfo) throws Exception {
+ // Start three nodes, the first one is going to be CMG and MetaStorage leader.
+ IgniteImpl ignite0 = startNode(testInfo, 0);
+ startNode(testInfo, 1);
+ startNode(testInfo, 2);
+
+ // Create a table with replica on every node.
+ createTableWithData(ignite0, TABLE_NAME, 3, 1);
+
+ String tableName = SCHEMA_PREFIX + TABLE_NAME;
+
+ TableImpl table = (TableImpl) ignite0.tables().table(tableName);
+ String tableId = table.tableId().toString();
+
+ // Lose the majority.
+ stopNode(1);
+ stopNode(2);
+
+ IgniteImpl restartingNode = startNode(testInfo, 1);
+
+ Loza loza = restartingNode.raftManager();
+
+ // Check that it restarts.
+ assertTrue(IgniteTestUtils.waitForCondition(
+ () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+ TimeUnit.SECONDS.toMillis(10)
+ ));
+
+ // Check the data rebalanced correctly.
+ checkTableWithData(restartingNode, TABLE_NAME);
+ }
+
+ /**
+ * Restarts all the nodes with the partition.
+ */
+ @Test
+ public void inMemoryNodeFullPartitionRestart(TestInfo testInfo) throws Exception {
+ // Start three nodes, the first one is going to be CMG and MetaStorage leader.
+ IgniteImpl ignite0 = startNode(testInfo, 0);
+ startNode(testInfo, 1);
+ startNode(testInfo, 2);
+
+ // Create a table with replicas on every node.
+ createTableWithData(ignite0, TABLE_NAME, 3, 1);
+
+ String tableName = SCHEMA_PREFIX + TABLE_NAME;
+
+ TableImpl table = (TableImpl) ignite0.tables().table(tableName);
+ String tableId = table.tableId().toString();
+
+ stopNode(0);
+ stopNode(1);
+ stopNode(2);
+
+ startNode(testInfo, 0);
+ startNode(testInfo, 1);
+ startNode(testInfo, 2);
+
+ // Check that full partition restart happens.
+ for (int i = 0; i < 3; i++) {
+ Loza loza = ignite(i).raftManager();
+
+ assertTrue(IgniteTestUtils.waitForCondition(
+ () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+ TimeUnit.SECONDS.toMillis(10)
+ ));
+ }
+ }
+
+ /**
+ * Checks the table exists and validates all data in it.
+ *
+ * @param ignite Ignite.
+ * @param name Table name.
+ */
+ private static void checkTableWithData(Ignite ignite, String name) {
+ Table table = ignite.tables().table(SCHEMA_PREFIX + name);
+
+ assertNotNull(table);
+
+ for (int i = 0; i < 100; i++) {
+ Tuple row = table.keyValueView().get(null, Tuple.create().set("id", i));
+
+ assertEquals(VALUE_PRODUCER.apply(i), row.stringValue("name"));
+ }
+ }
+
+ /**
+ * Creates a table and load data to it.
+ *
+ * @param ignite Ignite.
+ * @param name Table name.
+ * @param replicas Replica factor.
+ * @param partitions Partitions count.
+ */
+ private static void createTableWithData(Ignite ignite, String name, int replicas, int partitions) {
+ TableDefinition scmTbl1 = SchemaBuilders.tableBuilder("PUBLIC", name).columns(
+ SchemaBuilders.column("id", ColumnType.INT32).build(),
+ SchemaBuilders.column("name", ColumnType.string()).asNullable(true).build()
+ ).withPrimaryKey(
+ SchemaBuilders.primaryKey()
+ .withColumns("id")
+ .build()
+ ).build();
+
+ Table table = ignite.tables().createTable(
+ scmTbl1.canonicalName(),
+ tbl -> convert(scmTbl1, tbl).changeReplicas(replicas).changePartitions(partitions)
+ .changeDataStorage(dsc -> dsc.convert(VolatilePageMemoryDataStorageChange.class))
+ );
+
+ for (int i = 0; i < 100; i++) {
+ Tuple key = Tuple.create().set("id", i);
+ Tuple val = Tuple.create().set("name", VALUE_PRODUCER.apply(i));
+
+ table.keyValueView().put(null, key, val);
+ }
+ }
+
+ private static IgniteImpl ignite(int idx) {
+ return (IgniteImpl) CLUSTER_NODES.get(idx);
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 347eb6493b..bbfe2ec788 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableTxManagerImpl;
+import org.apache.ignite.internal.table.message.TableMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -210,6 +211,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
CmgMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ TableMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
@@ -270,6 +272,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
SchemaManager schemaManager = new SchemaManager(registry, tblCfg);
TableManager tableManager = new TableManager(
+ name,
registry,
tblCfg,
raftMgr,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 7c805fd7ea..fc7f98adcb 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableTxManagerImpl;
+import org.apache.ignite.internal.table.message.TableMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
@@ -244,6 +245,7 @@ public class IgniteImpl implements Ignite {
SqlQueryMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
ComputeMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ TableMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
@@ -330,6 +332,7 @@ public class IgniteImpl implements Ignite {
volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
distributedTblMgr = new TableManager(
+ name,
registry,
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
raftMgr,
@@ -701,6 +704,11 @@ public class IgniteImpl implements Ignite {
return partitionsStore;
}
+ @TestOnly
+ public Loza raftManager() {
+ return raftMgr;
+ }
+
@TestOnly
public ClusterNode node() {
return clusterSvc.topologyService().localMember();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index ef840a005c..417c22bba4 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -236,6 +236,9 @@ public class MockedStructuresTest extends IgniteAbstractTest {
/** Inner initialisation. */
@BeforeEach
void before() throws Exception {
+ when(rm.messagingService()).thenReturn(mock(MessagingService.class));
+ when(rm.topologyService()).thenReturn(mock(TopologyService.class));
+
revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
function.apply(0L).join();
@@ -773,6 +776,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
private TableManager createTableManager() {
TableManager tableManager = new TableManager(
+ "",
revisionUpdater,
tblsCfg,
rm,
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index e0e052432e..46ce848876 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -235,6 +235,11 @@
<artifactId>ignite-configuration-annotation-processor</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<configuration>
<annotationProcessorPaths>
@@ -243,6 +248,11 @@
<artifactId>ignite-configuration-annotation-processor</artifactId>
<version>${project.version}</version>
</path>
+ <path>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </path>
</annotationProcessorPaths>
</configuration>
</plugin>
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b64e99d4d1..32bb02cf2f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -25,6 +25,7 @@ import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.ge
import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.internal.utils.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
import static org.apache.ignite.internal.utils.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber;
@@ -36,8 +37,10 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.updatePendingAssign
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -106,11 +109,17 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
+import org.apache.ignite.internal.table.message.HasDataRequest;
+import org.apache.ignite.internal.table.message.HasDataRequestBuilder;
+import org.apache.ignite.internal.table.message.HasDataResponse;
+import org.apache.ignite.internal.table.message.TableMessageGroup;
+import org.apache.ignite.internal.table.message.TableMessagesFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteObjectName;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.utils.RebalanceUtil;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
@@ -122,6 +131,7 @@ import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
@@ -146,6 +156,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Timeout to complete the tablesByIdVv on revision update. */
private static final long TABLES_COMPLETE_TIMEOUT = 120;
+ private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = TimeUnit.SECONDS.toMillis(3);
+
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(TableManager.class);
@@ -218,9 +230,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Rebalance scheduler pool size. */
private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
+
/**
* Creates a new table manager.
*
+ * @param nodeName Node name.
* @param registry Registry for versioned values.
* @param tablesCfg Tables configuration.
* @param raftMgr Raft manager.
@@ -232,6 +247,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* tables.
*/
public TableManager(
+ String nodeName,
Consumer<Function<Long, CompletableFuture<?>>> registry,
TablesConfiguration tablesCfg,
Loza raftMgr,
@@ -312,7 +328,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
});
rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
- new NamedThreadFactory("rebalance-scheduler", LOG));
+ NamedThreadFactory.create(nodeName, "rebalance-scheduler", LOG));
ioExecutor = new ThreadPoolExecutor(
Math.min(Utils.cpus() * 3, 25),
@@ -320,7 +336,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
100,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
- new NamedThreadFactory("tableManager-io", LOG));
+ NamedThreadFactory.create(nodeName, "tableManager-io", LOG));
}
/** {@inheritDoc} */
@@ -365,6 +381,47 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return completedFuture(false);
}
});
+
+ addMessageHandler(raftMgr.messagingService());
+ }
+
+ /**
+ * Adds a table manager message handler.
+ *
+ * @param messagingService Messaging service.
+ */
+ private void addMessageHandler(MessagingService messagingService) {
+ messagingService.addMessageHandler(TableMessageGroup.class, (message, senderAddr, correlationId) -> {
+ if (message instanceof HasDataRequest) {
+ // This message queries if a node has any data for a specific partition of a table
+ assert correlationId != null;
+
+ HasDataRequest msg = (HasDataRequest) message;
+
+ UUID tableId = msg.tableId();
+ int partitionId = msg.partitionId();
+
+ boolean contains = false;
+
+ TableImpl table = tablesByIdVv.latest().get(tableId);
+
+ if (table != null) {
+ MvTableStorage storage = table.internalTable().storage();
+
+ MvPartitionStorage mvPartition = storage.getMvPartition(partitionId);
+
+ // If node's recovery process is incomplete (no partition storage), then we consider this node's
+ // partition storage empty.
+ if (mvPartition != null) {
+ // If applied index of a storage is greater than 0,
+ // then there is data.
+ contains = mvPartition.lastAppliedIndex() > 0;
+ }
+ }
+
+ messagingService.respond(senderAddr, TABLE_MESSAGES_FACTORY.hasDataResponse().result(contains).build(), correlationId);
+ }
+ });
}
/**
@@ -423,7 +480,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
ctx.storageRevision(),
ctx.oldValue().name(),
((ExtendedTableView) ctx.oldValue()).id(),
- (List<List<ClusterNode>>) ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments())
+ ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments())
);
} finally {
busyLock.leaveBusy();
@@ -459,9 +516,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
for (int i = 0; i < partCnt; i++) {
String partId = partitionRaftGroupName(((ExtendedTableConfiguration) tblCfg).id().value(), i);
- futures[i] = updatePendingAssignmentsKeys(
- tblCfg.name().value(), partId, baselineMgr.nodes(),
- partCnt, newReplicas,
+ futures[i] = updatePendingAssignmentsKeys(tblCfg.name().value(), partId, baselineMgr.nodes(), newReplicas,
replicasCtx.storageRevision(), metaStorageMgr, i);
}
@@ -506,10 +561,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
long causalityToken = assignmentsCtx.storageRevision();
- List<List<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() == null ? null :
- (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.oldValue());
+ List<Set<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() == null ? null : ByteUtils.fromBytes(assignmentsCtx.oldValue());
- List<List<ClusterNode>> newAssignments = (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.newValue());
+ List<Set<ClusterNode>> newAssignments = ByteUtils.fromBytes(assignmentsCtx.newValue());
// Empty assignments might be a valid case if tables are created from within cluster init HOCON
// configuration, which is not supported now.
@@ -519,16 +573,21 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+ List<Set<ClusterNode>> assignmentsLatest = ByteUtils.fromBytes(directProxy(tblCfg.assignments()).value());
+
+ TopologyService topologyService = raftMgr.topologyService();
+ ClusterNode localMember = topologyService.localMember();
+
// TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes
// TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will
// TODO: be exact same amount of partitions and replicas for both old and new assignments
for (int i = 0; i < partitions; i++) {
int partId = i;
- List<ClusterNode> oldPartAssignment = oldAssignments == null ? Collections.emptyList() :
+ Set<ClusterNode> oldPartAssignment = oldAssignments == null ? Collections.emptySet() :
oldAssignments.get(partId);
- List<ClusterNode> newPartAssignment = newAssignments.get(partId);
+ Set<ClusterNode> newPartAssignment = newAssignments.get(partId);
// Create new raft nodes according to new assignments.
tablesByIdVv.update(causalityToken, (tablesById, e) -> {
@@ -538,13 +597,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
InternalTable internalTbl = tablesById.get(tblId).internalTable();
+ MvTableStorage storage = internalTbl.storage();
+ boolean isInMemory = storage.isVolatile();
+
// TODO: IGNITE-17197 Remove assert after the ticket is resolved.
assert internalTbl.storage() instanceof MvTableStorage :
"Only multi version storages are supported. Current storage is a " + internalTbl.storage().getClass().getName();
// start new nodes, only if it is table creation
// other cases will be covered by rebalance logic
- List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptyList();
+ Set<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptySet();
String grpId = partitionRaftGroupName(tblId, partId);
@@ -552,33 +614,72 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
startGroupFut = CompletableFuture
- .supplyAsync(
- () -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
+ .supplyAsync(() -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
.thenComposeAsync((partitionStorage) -> {
- RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, tblCfg, partitionStorage,
- newPartAssignment);
-
- try {
- raftMgr.startRaftGroupNode(
- grpId,
- newPartAssignment,
- new PartitionListener(tblId, new VersionedRowStore(partitionStorage, txManager)),
- new RebalanceRaftGroupEventsListener(
- metaStorageMgr,
- tablesCfg.tables().get(tablesById.get(tblId).name()),
- grpId,
- partId,
- busyLock,
- movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
- rebalanceScheduler
- ),
- groupOptions
- );
-
- return CompletableFuture.completedFuture(null);
- } catch (NodeStoppingException ex) {
- return CompletableFuture.failedFuture(ex);
+ boolean hasData = partitionStorage.lastAppliedIndex() > 0;
+
+ CompletableFuture<Boolean> fut;
+
+ if (isInMemory || !hasData) {
+ Set<ClusterNode> partAssignments = assignmentsLatest.get(partId);
+
+ fut = queryDataNodesCount(tblId, partId, partAssignments).thenApply(dataNodesCount -> {
+ boolean fullPartitionRestart = dataNodesCount == 0;
+
+ if (fullPartitionRestart) {
+ return true;
+ }
+
+ boolean majorityAvailable = dataNodesCount >= (partAssignments.size() / 2) + 1;
+
+ if (majorityAvailable) {
+ RebalanceUtil.startPeerRemoval(partitionRaftGroupName(tblId, partId), localMember,
+ metaStorageMgr);
+
+ return false;
+ } else {
+ // No majority and not a full partition restart - need to restart nodes
+ // with current partition.
+ String msg = "Unable to start partition " + partId + ". Majority not available.";
+
+ throw new IgniteInternalException(msg);
+ }
+ });
+ } else {
+ fut = CompletableFuture.completedFuture(true);
}
+
+ return fut.thenComposeAsync(startGroup -> {
+ if (!startGroup) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, tblCfg, partitionStorage,
+ newPartAssignment);
+
+ try {
+ raftMgr.startRaftGroupNode(
+ grpId,
+ newPartAssignment,
+ new PartitionListener(tblId, new VersionedRowStore(partitionStorage, txManager)),
+ new RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ tablesCfg.tables().get(tablesById.get(tblId).name()),
+ grpId,
+ partId,
+ busyLock,
+ movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
+ this::calculateAssignments,
+ rebalanceScheduler
+ ),
+ groupOptions
+ );
+
+ return CompletableFuture.completedFuture(null);
+ } catch (NodeStoppingException ex) {
+ return CompletableFuture.failedFuture(ex);
+ }
+ }, ioExecutor);
}, ioExecutor);
}
@@ -606,11 +707,37 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CompletableFuture.allOf(futures).join();
}
+ /**
+ * Calculates the quantity of the data nodes for the partition of the table.
+ *
+ * @param tblId Table id.
+ * @param partId Partition id.
+ * @param partAssignments Partition assignments.
+ * @return A future that will hold the quantity of data nodes.
+ */
+ private CompletableFuture<Long> queryDataNodesCount(UUID tblId, int partId, Set<ClusterNode> partAssignments) {
+ HasDataRequestBuilder requestBuilder = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId);
+
+ //noinspection unchecked
+ CompletableFuture<Boolean>[] requestFutures = partAssignments.stream().map(node -> {
+ HasDataRequest request = requestBuilder.build();
+
+ return raftMgr.messagingService().invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT).thenApply(response -> {
+ assert response instanceof HasDataResponse : response;
+
+ return ((HasDataResponse) response).result();
+ }).exceptionally(unused -> false);
+ }).toArray(CompletableFuture[]::new);
+
+ return CompletableFuture.allOf(requestFutures)
+ .thenApply(unused -> Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
+ }
+
private RaftGroupOptions groupOptionsForPartition(
InternalTable internalTbl,
ExtendedTableConfiguration tableConfig,
MvPartitionStorage partitionStorage,
- List<ClusterNode> peers
+ Set<ClusterNode> peers
) {
RaftGroupOptions raftGroupOptions;
@@ -767,7 +894,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @param tblId Table id.
* @param assignment Affinity assignment.
*/
- private void dropTableLocally(long causalityToken, String name, UUID tblId, List<List<ClusterNode>> assignment) {
+ private void dropTableLocally(long causalityToken, String name, UUID tblId, List<Set<ClusterNode>> assignment) {
try {
int partitions = assignment.size();
@@ -805,6 +932,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
}
+ private Set<ClusterNode> calculateAssignments(TableConfiguration tableCfg, int partNum) {
+ return AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum, tableCfg.value().replicas());
+ }
+
/**
* Compounds a RAFT group unique name.
*
@@ -881,7 +1012,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
baselineMgr.nodes(),
tableChange.partitions(),
- tableChange.replicas())))
+ tableChange.replicas(),
+ HashSet::new)))
// Table schema preparation.
.changeSchemas(schemasCh -> schemasCh.create(
String.valueOf(INITIAL_SCHEMA_VERSION),
@@ -1420,7 +1552,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String partId = partitionRaftGroupName(tblId, part);
// Assignments of the pending rebalance that we received through the meta storage watch mechanism.
- List<ClusterNode> newPeers = ((List<ClusterNode>) ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value()));
+ Set<ClusterNode> newPeers = ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).join();
@@ -1440,12 +1572,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(partId),
pendingAssignmentsWatchEvent.revision()).join().value();
- List<ClusterNode> assignments = stableAssignments == null
+ Set<ClusterNode> assignments = stableAssignments == null
// This is for the case when the first rebalance occurs.
- ? ((List<List<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
- : (List<ClusterNode>) ByteUtils.fromBytes(stableAssignments);
+ ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
+ : ByteUtils.fromBytes(stableAssignments);
- ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
+ ClusterNode localMember = raftMgr.topologyService().localMember();
var deltaPeers = newPeers.stream()
.filter(p -> !assignments.contains(p))
@@ -1478,6 +1610,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
part,
busyLock,
movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)),
+ TableManager.this::calculateAssignments,
rebalanceScheduler
);
@@ -1547,17 +1680,17 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String partId = partitionRaftGroupName(tblId, part);
- var stableAssignments = (List<ClusterNode>) ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
+ Set<ClusterNode> stableAssignments = ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
byte[] pendingFromMetastorage = metaStorageMgr.get(pendingPartAssignmentsKey(partId),
stableAssignmentsWatchEvent.revision()).join().value();
- List<ClusterNode> pendingAssignments = pendingFromMetastorage == null
- ? Collections.emptyList()
- : (List<ClusterNode>) ByteUtils.fromBytes(pendingFromMetastorage);
+ Set<ClusterNode> pendingAssignments = pendingFromMetastorage == null
+ ? Collections.emptySet()
+ : ByteUtils.fromBytes(pendingFromMetastorage);
try {
- ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
+ ClusterNode localMember = raftMgr.topologyService().localMember();
if (!stableAssignments.contains(localMember) && !pendingAssignments.contains(localMember)) {
raftMgr.stopRaftGroup(partId);
@@ -1577,6 +1710,38 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
LOG.warn("Unable to process stable assignments event", e);
}
});
+
+ metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), new WatchListener() {
+ @Override
+ public boolean onUpdate(@NotNull WatchEvent evt) {
+ ByteArray key = evt.entryEvent().newEntry().key();
+
+ int partitionNumber = extractPartitionNumber(key);
+ UUID tblId = extractTableId(key, ASSIGNMENTS_SWITCH_REDUCE_PREFIX);
+
+ String partitionId = partitionRaftGroupName(tblId, partitionNumber);
+
+ TableImpl tbl = tablesByIdVv.latest().get(tblId);
+
+ TableConfiguration tblCfg = tablesCfg.tables().get(tbl.name());
+
+ RebalanceUtil.handleReduceChanged(
+ metaStorageMgr,
+ baselineMgr.nodes(),
+ tblCfg.value().replicas(),
+ partitionNumber,
+ partitionId,
+ evt
+ );
+
+ return true;
+ }
+
+ @Override
+ public void onError(@NotNull Throwable e) {
+ LOG.warn("Unable to process switch reduce event", e);
+ }
+ });
}
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index e2e8c2f871..54179102d4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -17,18 +17,25 @@
package org.apache.ignite.internal.table.distributed.raft;
+import static org.apache.ignite.internal.metastorage.client.CompoundCondition.and;
import static org.apache.ignite.internal.metastorage.client.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.client.Conditions.revision;
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.internal.metastorage.client.Operations.put;
import static org.apache.ignite.internal.metastorage.client.Operations.remove;
+import static org.apache.ignite.internal.utils.RebalanceUtil.intersect;
import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.readClusterNodes;
+import static org.apache.ignite.internal.utils.RebalanceUtil.resolveClusterNodes;
import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.subtract;
+import static org.apache.ignite.internal.utils.RebalanceUtil.switchAppendKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.switchReduceKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.union;
import static org.apache.ignite.raft.jraft.core.NodeImpl.LEADER_STEPPED_DOWN;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,6 +43,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -43,13 +51,15 @@ import org.apache.ignite.internal.configuration.schema.ExtendedTableChange;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.SimpleCondition;
+import org.apache.ignite.internal.metastorage.client.Update;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
@@ -65,6 +75,36 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
/** Ignite logger. */
private static final IgniteLogger LOG = Loggers.forClass(RebalanceRaftGroupEventsListener.class);
+ /** Number of retrying of the current rebalance in case of errors. */
+ private static final int REBALANCE_RETRY_THRESHOLD = 10;
+
+ /** Delay between unsuccessful trial of a rebalance and a new trial, ms. */
+ public static final int REBALANCE_RETRY_DELAY_MS = 200;
+
+ /** Success code for the MetaStorage switch append assignments change. */
+ private static final int SWITCH_APPEND_SUCCESS = 1;
+
+ /** Success code for the MetaStorage switch reduce assignments change. */
+ private static final int SWITCH_REDUCE_SUCCESS = 2;
+
+ /** Success code for the MetaStorage pending rebalance change. */
+ private static final int SCHEDULE_PENDING_REBALANCE_SUCCESS = 3;
+
+ /** Success code for the MetaStorage stable assignments change. */
+ private static final int FINISH_REBALANCE_SUCCESS = 4;
+
+ /** Failure code for the MetaStorage switch append assignments change. */
+ private static final int SWITCH_APPEND_FAIL = -SWITCH_APPEND_SUCCESS;
+
+ /** Failure code for the MetaStorage switch reduce assignments change. */
+ private static final int SWITCH_REDUCE_FAIL = -SWITCH_REDUCE_SUCCESS;
+
+ /** Failure code for the MetaStorage pending rebalance change. */
+ private static final int SCHEDULE_PENDING_REBALANCE_FAIL = -SCHEDULE_PENDING_REBALANCE_SUCCESS;
+
+ /** Failure code for the MetaStorage stable assignments change. */
+ private static final int FINISH_REBALANCE_FAIL = -FINISH_REBALANCE_SUCCESS;
+
/** Meta storage manager. */
private final MetaStorageManager metaStorageMgr;
@@ -89,11 +129,8 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
/** Attempts to retry the current rebalance in case of errors. */
private final AtomicInteger rebalanceAttempts = new AtomicInteger(0);
- /** Number of retrying of the current rebalance in case of errors. */
- private static final int REBALANCE_RETRY_THRESHOLD = 10;
-
- /** Delay between unsuccessful trial of a rebalance and a new trial, ms. */
- public static final int REBALANCE_RETRY_DELAY_MS = 200;
+ /** Function that calculates assignments for table's partition. */
+ private final BiFunction<TableConfiguration, Integer, Set<ClusterNode>> calculateAssignmentsFn;
/**
* Constructs new listener.
@@ -102,6 +139,9 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
* @param tblConfiguration Table configuration.
* @param partId Partition id.
* @param partNum Partition number.
+ * @param busyLock Busy lock.
+ * @param movePartitionFn Function that moves partition between nodes.
+ * @param calculateAssignmentsFn Function that calculates assignments for table's partition.
* @param rebalanceScheduler Executor for scheduling rebalance retries.
*/
public RebalanceRaftGroupEventsListener(
@@ -111,6 +151,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
int partNum,
IgniteSpinBusyLock busyLock,
BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartitionFn,
+ BiFunction<TableConfiguration, Integer, Set<ClusterNode>> calculateAssignmentsFn,
ScheduledExecutorService rebalanceScheduler) {
this.metaStorageMgr = metaStorageMgr;
this.tblConfiguration = tblConfiguration;
@@ -118,6 +159,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
this.partNum = partNum;
this.busyLock = busyLock;
this.movePartitionFn = movePartitionFn;
+ this.calculateAssignmentsFn = calculateAssignmentsFn;
this.rebalanceScheduler = rebalanceScheduler;
}
@@ -140,7 +182,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
Entry pendingEntry = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get();
if (!pendingEntry.empty()) {
- List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
+ Set<ClusterNode> pendingNodes = ByteUtils.fromBytes(pendingEntry.value());
LOG.info("New leader elected. Going to reconfigure peers [group={}, partition={}, table={}, peers={}]",
partId, partNum, tblConfiguration.name().value(), pendingNodes);
@@ -252,92 +294,193 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
*/
private void doOnNewPeersConfigurationApplied(List<PeerId> peers) {
try {
- Map<ByteArray, Entry> keys = metaStorageMgr.getAll(
+ ByteArray pendingPartAssignmentsKey = pendingPartAssignmentsKey(partId);
+ ByteArray stablePartAssignmentsKey = stablePartAssignmentsKey(partId);
+ ByteArray plannedPartAssignmentsKey = plannedPartAssignmentsKey(partId);
+ ByteArray switchReduceKey = switchReduceKey(partId);
+ ByteArray switchAppendKey = switchAppendKey(partId);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
+ Map<ByteArray, Entry> values = metaStorageMgr.getAll(
Set.of(
- plannedPartAssignmentsKey(partId),
- pendingPartAssignmentsKey(partId),
- stablePartAssignmentsKey(partId))).get();
+ plannedPartAssignmentsKey,
+ pendingPartAssignmentsKey,
+ stablePartAssignmentsKey,
+ switchReduceKey,
+ switchAppendKey
+ )
+ ).get();
- Entry plannedEntry = keys.get(plannedPartAssignmentsKey(partId));
+ Entry stableEntry = values.get(stablePartAssignmentsKey);
+ Entry pendingEntry = values.get(pendingPartAssignmentsKey);
+ Entry plannedEntry = values.get(plannedPartAssignmentsKey);
+ Entry switchReduceEntry = values.get(switchReduceKey);
+ Entry switchAppendEntry = values.get(switchAppendKey);
- List<ClusterNode> appliedPeers = resolveClusterNodes(peers,
- keys.get(pendingPartAssignmentsKey(partId)).value(), keys.get(stablePartAssignmentsKey(partId)).value());
+ Set<ClusterNode> calculatedAssignments = calculateAssignmentsFn.apply(tblConfiguration, partNum);
- tblConfiguration.change(ch -> {
- List<List<ClusterNode>> assignments =
- (List<List<ClusterNode>>) ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
- assignments.set(partNum, appliedPeers);
- ((ExtendedTableChange) ch).changeAssignments(ByteUtils.toBytes(assignments));
- }).get();
-
- if (plannedEntry.value() != null) {
- if (!metaStorageMgr.invoke(If.iif(
- revision(plannedPartAssignmentsKey(partId)).eq(plannedEntry.revision()),
- ops(
- put(stablePartAssignmentsKey(partId), ByteUtils.toBytes(appliedPeers)),
- put(pendingPartAssignmentsKey(partId), plannedEntry.value()),
- remove(plannedPartAssignmentsKey(partId)))
- .yield(true),
- ops().yield(false))).get().getAsBoolean()) {
- LOG.info("Planned key changed while trying to update rebalance information. Going to retry"
- + " [key={}, partition={}, table={}, appliedPeers={}]",
- plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+ Set<ClusterNode> stable = resolveClusterNodes(peers, pendingEntry.value(), stableEntry.value());
- doOnNewPeersConfigurationApplied(peers);
- }
+ Set<ClusterNode> retrievedSwitchReduce = readClusterNodes(switchReduceEntry);
+ Set<ClusterNode> retrievedSwitchAppend = readClusterNodes(switchAppendEntry);
+ Set<ClusterNode> retrievedStable = readClusterNodes(stableEntry);
- LOG.info("Rebalance finished. Going to schedule next rebalance [partition={}, table={}, appliedPeers={}, plannedPeers={}]",
- partNum, tblConfiguration.name().value(), appliedPeers, ByteUtils.fromBytes(plannedEntry.value()));
- } else {
- if (!metaStorageMgr.invoke(If.iif(
- notExists(plannedPartAssignmentsKey(partId)),
- ops(put(stablePartAssignmentsKey(partId), ByteUtils.toBytes(appliedPeers)),
- remove(pendingPartAssignmentsKey(partId))).yield(true),
- ops().yield(false))).get().getAsBoolean()) {
- LOG.info("Planned key changed while trying to update rebalance information. Going to retry"
- + " [key={}, partition={}, table={}, appliedPeers={}]",
- plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+ // Were reduced
+ Set<ClusterNode> reducedNodes = subtract(retrievedSwitchReduce, stable);
- doOnNewPeersConfigurationApplied(peers);
- }
+ // Were added
+ Set<ClusterNode> addedNodes = subtract(stable, retrievedStable);
- LOG.info("Rebalance finished [partition={}, table={}, appliedPeers={}]",
- partNum, tblConfiguration.name().value(), appliedPeers);
- }
+ // For further reduction
+ Set<ClusterNode> calculatedSwitchReduce = subtract(retrievedSwitchReduce, reducedNodes);
- rebalanceAttempts.set(0);
- } catch (InterruptedException | ExecutionException e) {
- // TODO: IGNITE-14693
- LOG.warn("Unable to commit partition configuration to metastore [table = {}, partition = {}]",
- e, tblConfiguration.name(), partNum);
- }
- }
+ // For further addition
+ Set<ClusterNode> calculatedSwitchAppend = union(retrievedSwitchAppend, reducedNodes);
+ calculatedSwitchAppend = subtract(calculatedSwitchAppend, addedNodes);
+ calculatedSwitchAppend = intersect(calculatedAssignments, calculatedSwitchAppend);
- private static List<ClusterNode> resolveClusterNodes(
- List<PeerId> peers, byte[] pendingAssignments, byte[] stableAssignments) {
- Map<NetworkAddress, ClusterNode> resolveRegistry = new HashMap<>();
+ Set<ClusterNode> calculatedPendingReduction = subtract(stable, retrievedSwitchReduce);
- if (pendingAssignments != null) {
- ((List<ClusterNode>) ByteUtils.fromBytes(pendingAssignments)).forEach(n -> resolveRegistry.put(n.address(), n));
- }
+ Set<ClusterNode> calculatedPendingAddition = union(stable, reducedNodes);
+ calculatedPendingAddition = intersect(calculatedAssignments, calculatedPendingAddition);
- if (stableAssignments != null) {
- ((List<ClusterNode>) ByteUtils.fromBytes(stableAssignments)).forEach(n -> resolveRegistry.put(n.address(), n));
- }
+ // eq(revision(assignments.stable), retrievedAssignmentsStable.revision)
+ SimpleCondition con1 = stableEntry.empty()
+ ? notExists(stablePartAssignmentsKey) :
+ revision(stablePartAssignmentsKey).eq(stableEntry.revision());
+
+ // eq(revision(assignments.pending), retrievedAssignmentsPending.revision)
+ SimpleCondition con2 = revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
- List<ClusterNode> resolvedNodes = new ArrayList<>(peers.size());
+ // eq(revision(assignments.switch.reduce), retrievedAssignmentsSwitchReduce.revision)
+ SimpleCondition con3 = switchReduceEntry.empty()
+ ? notExists(switchReduceKey) : revision(switchReduceKey).eq(switchReduceEntry.revision());
- for (PeerId p : peers) {
- var addr = NetworkAddress.from(p.getEndpoint().getIp() + ":" + p.getEndpoint().getPort());
+ // eq(revision(assignments.switch.append), retrievedAssignmentsSwitchAppend.revision)
+ SimpleCondition con4 = switchAppendEntry.empty()
+ ? notExists(switchAppendKey) : revision(switchAppendKey).eq(switchAppendEntry.revision());
- if (resolveRegistry.containsKey(addr)) {
- resolvedNodes.add(resolveRegistry.get(addr));
+ // All conditions combined with AND operator.
+ Condition retryPreconditions = and(con1, and(con2, and(con3, con4)));
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
+ tblConfiguration.change(ch -> {
+ List<Set<ClusterNode>> assignments = ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
+ assignments.set(partNum, stable);
+ ((ExtendedTableChange) ch).changeAssignments(ByteUtils.toBytes(assignments));
+ }).get(10, TimeUnit.SECONDS);
+
+ Update successCase;
+ Update failCase;
+
+ byte[] stableByteArray = ByteUtils.toBytes(stable);
+ byte[] additionByteArray = ByteUtils.toBytes(calculatedPendingAddition);
+ byte[] reductionByteArray = ByteUtils.toBytes(calculatedPendingReduction);
+ byte[] switchReduceByteArray = ByteUtils.toBytes(calculatedSwitchReduce);
+ byte[] switchAppendByteArray = ByteUtils.toBytes(calculatedSwitchAppend);
+
+ if (!calculatedSwitchAppend.isEmpty()) {
+ successCase = ops(
+ put(stablePartAssignmentsKey, stableByteArray),
+ put(pendingPartAssignmentsKey, additionByteArray),
+ put(switchReduceKey, switchReduceByteArray),
+ put(switchAppendKey, switchAppendByteArray)
+ ).yield(SWITCH_APPEND_SUCCESS);
+ failCase = ops().yield(SWITCH_APPEND_FAIL);
+ } else if (!calculatedSwitchReduce.isEmpty()) {
+ successCase = ops(
+ put(stablePartAssignmentsKey, stableByteArray),
+ put(pendingPartAssignmentsKey, reductionByteArray),
+ put(switchReduceKey, switchReduceByteArray),
+ put(switchAppendKey, switchAppendByteArray)
+ ).yield(SWITCH_REDUCE_SUCCESS);
+ failCase = ops().yield(SWITCH_REDUCE_FAIL);
} else {
- throw new IgniteInternalException("Can't find appropriate cluster node for raft group peer: " + p);
+ Condition con5;
+ if (plannedEntry.value() != null) {
+ // eq(revision(partition.assignments.planned), plannedEntry.revision)
+ con5 = revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+
+ successCase = ops(
+ put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
+ put(pendingPartAssignmentsKey, plannedEntry.value()),
+ remove(plannedPartAssignmentsKey)
+ ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
+
+ failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
+ } else {
+ // notExists(partition.assignments.planned)
+ con5 = notExists(plannedPartAssignmentsKey);
+
+ successCase = ops(
+ put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
+ remove(pendingPartAssignmentsKey)
+ ).yield(FINISH_REBALANCE_SUCCESS);
+
+ failCase = ops().yield(FINISH_REBALANCE_FAIL);
+ }
+
+ retryPreconditions = and(retryPreconditions, con5);
}
- }
- return resolvedNodes;
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
+ int res = metaStorageMgr.invoke(If.iif(retryPreconditions, successCase, failCase)).get().getAsInt();
+
+ if (res < 0) {
+ switch (res) {
+ case SWITCH_APPEND_FAIL:
+ LOG.info("Rebalance keys changed while trying to update rebalance pending addition information. Going to retry"
+ + " [partition={}, table={}, appliedPeers={}]", partNum, tblConfiguration.name(), stable);
+ break;
+ case SWITCH_REDUCE_FAIL:
+ LOG.info("Rebalance keys changed while trying to update rebalance pending reduce information. Going to retry"
+ + " [partition={}, table={}, appliedPeers={}]", partNum, tblConfiguration.name(), stable);
+ break;
+ case SCHEDULE_PENDING_REBALANCE_FAIL:
+ case FINISH_REBALANCE_FAIL:
+ LOG.info("Rebalance keys changed while trying to update rebalance information. Going to retry"
+ + " [partition={}, table={}, appliedPeers={}]", partNum, tblConfiguration.name(), stable);
+ break;
+ default:
+ assert false : res;
+ break;
+ }
+
+ doOnNewPeersConfigurationApplied(peers);
+ return;
+ }
+
+ switch (res) {
+ case SWITCH_APPEND_SUCCESS:
+ LOG.info("Rebalance finished. Going to schedule next rebalance with addition"
+ + " [partition={}, table={}, appliedPeers={}, plannedPeers={}]",
+ partNum, tblConfiguration.name().value(), stable, calculatedPendingAddition);
+ break;
+ case SWITCH_REDUCE_SUCCESS:
+ LOG.info("Rebalance finished. Going to schedule next rebalance with reduction"
+ + " [partition={}, table={}, appliedPeers={}, plannedPeers={}]",
+ partNum, tblConfiguration.name().value(), stable, calculatedPendingReduction);
+ break;
+ case SCHEDULE_PENDING_REBALANCE_SUCCESS:
+ LOG.info("Rebalance finished. Going to schedule next rebalance [partition={}, table={}, appliedPeers={}, "
+ + "plannedPeers={}]",
+ partNum, tblConfiguration.name().value(), stable, ByteUtils.fromBytes(plannedEntry.value()));
+ break;
+ case FINISH_REBALANCE_SUCCESS:
+ LOG.info("Rebalance finished [partition={}, table={}, appliedPeers={}]",
+ partNum, tblConfiguration.name().value(), stable);
+ break;
+ default:
+ assert false : res;
+ break;
+ }
+
+ rebalanceAttempts.set(0);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ // TODO: IGNITE-14693
+ LOG.warn("Unable to commit partition configuration to metastore [table = {}, partition = {}]",
+ e, tblConfiguration.name(), partNum);
+ }
}
/**
@@ -346,7 +489,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
* @param nodes List of cluster nodes to transform.
* @return List of transformed peers.
*/
- private static List<Peer> clusterNodesToPeers(List<ClusterNode> nodes) {
+ private static List<Peer> clusterNodesToPeers(Set<ClusterNode> nodes) {
List<Peer> peers = new ArrayList<>(nodes.size());
for (ClusterNode node : nodes) {
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataRequest.java
similarity index 58%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataRequest.java
index 8f684ba0c2..913319b97a 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataRequest.java
@@ -15,24 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute;
+package org.apache.ignite.internal.table.message;
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.network.annotations.MessageGroup;
+import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Message types for the Compute module.
+ * A message that queries a node whether it has data for the partition of a table.
*/
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
-public class ComputeMessageTypes {
- /**
- * Type for {@link ExecuteRequest}.
- */
- public static final short EXECUTE_REQUEST = 0;
+@Transferable(TableMessageGroup.HAS_DATA_REQUEST)
+public interface HasDataRequest extends NetworkMessage {
+ /** ID of the table. */
+ UUID tableId();
- /**
- * Type for {@link ExecuteResponse}.
- */
- public static final short EXECUTE_RESPONSE = 1;
+ /** ID of the partition. */
+ int partitionId();
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataResponse.java
similarity index 58%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataResponse.java
index 8f684ba0c2..5ecb678c35 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataResponse.java
@@ -15,24 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute;
+package org.apache.ignite.internal.table.message;
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.network.annotations.MessageGroup;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Message types for the Compute module.
+ * A response to the {@link HasDataRequest}.
*/
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
-public class ComputeMessageTypes {
- /**
- * Type for {@link ExecuteRequest}.
- */
- public static final short EXECUTE_REQUEST = 0;
-
- /**
- * Type for {@link ExecuteResponse}.
- */
- public static final short EXECUTE_RESPONSE = 1;
+@Transferable(TableMessageGroup.HAS_DATA_RESPONSE)
+public interface HasDataResponse extends NetworkMessage {
+ /** {@code true} if a node has data for a partition of a table, {@code false} otherwise. */
+ boolean result();
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
similarity index 62%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
index 8f684ba0c2..8b9888617e 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
@@ -15,24 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute;
+package org.apache.ignite.internal.table.message;
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.network.annotations.MessageGroup;
/**
- * Message types for the Compute module.
+ * Table module message group.
*/
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
-public class ComputeMessageTypes {
- /**
- * Type for {@link ExecuteRequest}.
- */
- public static final short EXECUTE_REQUEST = 0;
+@MessageGroup(groupType = 8, groupName = "TableMessages")
+public class TableMessageGroup {
+ /** {@link HasDataRequest}. */
+ static final int HAS_DATA_REQUEST = 0;
- /**
- * Type for {@link ExecuteResponse}.
- */
- public static final short EXECUTE_RESPONSE = 1;
+ /** {@link HasDataResponse}. */
+ static final int HAS_DATA_RESPONSE = 1;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 9f0f7db412..8478648356 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -20,22 +20,38 @@ package org.apache.ignite.internal.utils;
import static org.apache.ignite.internal.metastorage.client.CompoundCondition.and;
import static org.apache.ignite.internal.metastorage.client.CompoundCondition.or;
import static org.apache.ignite.internal.metastorage.client.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.client.Conditions.revision;
import static org.apache.ignite.internal.metastorage.client.Conditions.value;
+import static org.apache.ignite.internal.metastorage.client.If.iif;
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.internal.metastorage.client.Operations.put;
import static org.apache.ignite.internal.metastorage.client.Operations.remove;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Conditions;
+import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.Operations;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.entity.PeerId;
import org.jetbrains.annotations.NotNull;
/**
@@ -72,7 +88,6 @@ public class RebalanceUtil {
* @param tableName Table name.
* @param partId Unique identifier of a partition.
* @param baselineNodes Nodes in baseline.
- * @param partitions Number of partitions in a table.
* @param replicas Number of replicas for a table.
* @param revision Revision of Meta Storage that is specific for the assignment update.
* @param metaStorageMgr Meta Storage manager.
@@ -80,7 +95,7 @@ public class RebalanceUtil {
*/
public static @NotNull CompletableFuture<Void> updatePendingAssignmentsKeys(
String tableName, String partId, Collection<ClusterNode> baselineNodes,
- int partitions, int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) {
+ int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) {
ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
ByteArray partAssignmentsPendingKey = pendingPartAssignmentsKey(partId);
@@ -89,8 +104,9 @@ public class RebalanceUtil {
ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
- byte[] partAssignmentsBytes = ByteUtils.toBytes(
- AffinityUtils.calculateAssignments(baselineNodes, partitions, replicas).get(partNum));
+ Set<ClusterNode> partAssignments = AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+
+ byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
// if empty(partition.change.trigger.revision) || partition.change.trigger.revision < event.revision:
// if empty(partition.assignments.pending) && partition.assignments.stable != calcPartAssighments():
@@ -104,13 +120,13 @@ public class RebalanceUtil {
// remove(partition.assignments.planned)
// else:
// skip
- var iif = If.iif(or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))),
- If.iif(and(notExists(partAssignmentsPendingKey), value(partAssignmentsStableKey).ne(partAssignmentsBytes)),
+ var iif = iif(or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))),
+ iif(and(notExists(partAssignmentsPendingKey), value(partAssignmentsStableKey).ne(partAssignmentsBytes)),
ops(
put(partAssignmentsPendingKey, partAssignmentsBytes),
put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
).yield(PENDING_KEY_UPDATED),
- If.iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
+ iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
ops(
put(partAssignmentsPlannedKey, partAssignmentsBytes),
put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
@@ -157,6 +173,12 @@ public class RebalanceUtil {
/** Key prefix for stable assignments. */
public static final String STABLE_ASSIGNMENTS_PREFIX = "assignments.stable.";
+ /** Key prefix for switch reduce assignments. */
+ public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX = "assignments.switch.reduce.";
+
+ /** Key prefix for switch append assignments. */
+ public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX = "assignments.switch.append.";
+
/**
* Key that is needed for the rebalance algorithm.
*
@@ -202,13 +224,46 @@ public class RebalanceUtil {
}
/**
- * Extract table id from pending key of partition.
+ * Key that is needed for the rebalance algorithm.
+ *
+ * @param partId Unique identifier of a partition.
+ * @return Key for a partition.
+ * @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
+ */
+ public static ByteArray switchReduceKey(String partId) {
+ return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + partId);
+ }
+
+ /**
+ * Key that is needed for the rebalance algorithm.
+ *
+ * @param partId Unique identifier of a partition.
+ * @return Key for a partition.
+ * @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
+ */
+ public static ByteArray switchAppendKey(String partId) {
+ return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + partId);
+ }
+
+ /**
+ * Extract table id from a metastorage key of partition.
+ *
+ * @param key Key.
+ * @return Table id.
+ */
+ public static UUID extractTableId(ByteArray key) {
+ return extractTableId(key, "");
+ }
+
+ /**
+ * Extract table id from a metastorage key of partition.
*
* @param key Key.
+ * @param prefix Key prefix.
* @return Table id.
*/
public static UUID extractTableId(ByteArray key, String prefix) {
- var strKey = key.toString();
+ String strKey = key.toString();
return UUID.fromString(strKey.substring(prefix.length(), strKey.indexOf("_part_")));
}
@@ -235,4 +290,209 @@ public class RebalanceUtil {
// As long as we don't have a general failure handler, we assume that all errors are recoverable.
return true;
}
+
+ /**
+ * Starts the process of removing peer from raft group if that peer has in-memory storage or if its
+ * storage has been cleared.
+ *
+ * @param partId Partition's raft group id.
+ * @param clusterNode Cluster node to be removed from peers.
+ * @param metaStorageMgr MetaStorage manager.
+ * @return Completable future that signifies the completion of this operation.
+ */
+ public static CompletableFuture<Void> startPeerRemoval(String partId, ClusterNode clusterNode, MetaStorageManager metaStorageMgr) {
+ ByteArray key = switchReduceKey(partId);
+
+ return metaStorageMgr.get(key)
+ .thenCompose(retrievedAssignmentsSwitchReduce -> {
+ byte[] prevValue = retrievedAssignmentsSwitchReduce.value();
+
+ if (prevValue != null) {
+ Set<ClusterNode> prev = ByteUtils.fromBytes(prevValue);
+
+ prev.add(clusterNode);
+
+ return metaStorageMgr.invoke(
+ revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
+ Operations.put(key, ByteUtils.toBytes(prev)),
+ Operations.noop()
+ );
+ } else {
+ var newValue = new HashSet<>();
+
+ newValue.add(clusterNode);
+
+ return metaStorageMgr.invoke(
+ Conditions.notExists(key),
+ Operations.put(key, ByteUtils.toBytes(newValue)),
+ Operations.noop()
+ );
+ }
+ }).thenCompose(res -> {
+ if (!res) {
+ return startPeerRemoval(partId, clusterNode, metaStorageMgr);
+ }
+
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ /**
+ * Handles assignments switch reduce changed updating pending assignments if there is no rebalancing in progress.
+ * If there is rebalancing in progress, then new assignments will be applied when rebalance finishes.
+ *
+ * @param metaStorageMgr MetaStorage manager.
+ * @param baselineNodes Baseline nodes.
+ * @param replicas Replicas count.
+ * @param partNum Number of the partition.
+ * @param partId Partition's raft group id.
+ * @param event Assignments switch reduce change event.
+ * @return Completable future that signifies the completion of this operation.
+ */
+ public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> baselineNodes,
+ int replicas, int partNum, String partId, WatchEvent event) {
+ Entry entry = event.entryEvent().newEntry();
+ byte[] eventData = entry.value();
+
+ Set<ClusterNode> assignments = AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+
+ Set<ClusterNode> switchReduce = ByteUtils.fromBytes(eventData);
+
+ ByteArray pendingKey = pendingPartAssignmentsKey(partId);
+
+ Set<ClusterNode> pendingAssignments = subtract(assignments, switchReduce);
+
+ byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
+ byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
+
+ if (switchReduce.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ ByteArray changeTriggerKey = partChangeTriggerKey(partId);
+ byte[] rev = ByteUtils.longToBytes(event.entryEvent().newEntry().revision());
+
+ // Here is what happens in the MetaStorage:
+ // if ((notExists(changeTriggerKey) || value(changeTriggerKey) < revision) && (notExists(pendingKey) && notExists(stableKey)) {
+ // put(pendingKey, pending)
+ // put(stableKey, assignments)
+ // put(changeTriggerKey, revision)
+ // } else if ((notExists(changeTriggerKey) || value(changeTriggerKey) < revision) && (notExists(pendingKey))) {
+ // put(pendingKey, pending)
+ // put(changeTriggerKey, revision)
+ // }
+
+ If resultingOperation = iif(
+ and(
+ or(notExists(changeTriggerKey), value(changeTriggerKey).lt(rev)),
+ and(notExists(pendingKey), (notExists(stablePartAssignmentsKey(partId))))
+ ),
+ ops(
+ put(pendingKey, pendingByteArray),
+ put(stablePartAssignmentsKey(partId), assignmentsByteArray),
+ put(changeTriggerKey, rev)
+ ).yield(),
+ iif(
+ and(
+ or(notExists(changeTriggerKey), value(changeTriggerKey).lt(rev)),
+ notExists(pendingKey)
+ ),
+ ops(
+ put(pendingKey, pendingByteArray),
+ put(changeTriggerKey, rev)
+ ).yield(),
+ ops().yield()
+ )
+ );
+
+ return metaStorageMgr.invoke(resultingOperation).thenApply(unused -> null);
+ }
+
+ /**
+ * Builds a list of cluster nodes based on a list of peers, pending and stable assignments.
+ * A peer will be added to the result list iff peer's address is present in pending or stable assignments.
+ *
+ * @param peers List of peers.
+ * @param pendingAssignments Byte array that contains serialized list of pending assignments.
+ * @param stableAssignments Byte array that contains serialized list of stable assignments.
+ * @return Resolved cluster nodes.
+ */
+ public static Set<ClusterNode> resolveClusterNodes(List<PeerId> peers, byte[] pendingAssignments, byte[] stableAssignments) {
+ Map<NetworkAddress, ClusterNode> resolveRegistry = new HashMap<>();
+
+ if (pendingAssignments != null) {
+ Set<ClusterNode> pending = ByteUtils.fromBytes(pendingAssignments);
+ pending.forEach(n -> resolveRegistry.put(n.address(), n));
+ }
+
+ if (stableAssignments != null) {
+ Set<ClusterNode> stable = ByteUtils.fromBytes(stableAssignments);
+ stable.forEach(n -> resolveRegistry.put(n.address(), n));
+ }
+
+ Set<ClusterNode> resolvedNodes = new HashSet<>(peers.size());
+
+ for (PeerId p : peers) {
+ var addr = NetworkAddress.from(p.getEndpoint().getIp() + ":" + p.getEndpoint().getPort());
+
+ if (resolveRegistry.containsKey(addr)) {
+ resolvedNodes.add(resolveRegistry.get(addr));
+ } else {
+ throw new IgniteInternalException("Can't find appropriate cluster node for raft group peer: " + p);
+ }
+ }
+
+ return resolvedNodes;
+ }
+
+ /**
+ * Reads a list of cluster nodes from a MetaStorage entry.
+ *
+ * @param entry MetaStorage entry.
+ * @return List of cluster nodes.
+ */
+ public static Set<ClusterNode> readClusterNodes(Entry entry) {
+ if (entry.empty()) {
+ return Collections.emptySet();
+ }
+
+ return ByteUtils.fromBytes(entry.value());
+ }
+
+ /**
+ * Removes nodes from set of nodes.
+ *
+ * @param minuend Set to remove nodes from.
+ * @param subtrahend Set of nodes to be removed.
+ * @return Result of the subtraction.
+ */
+ public static Set<ClusterNode> subtract(Set<ClusterNode> minuend, Set<ClusterNode> subtrahend) {
+ return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(Collectors.toSet());
+ }
+
+ /**
+ * Adds nodes to the set of nodes.
+ *
+ * @param op1 First operand.
+ * @param op2 Second operand.
+ * @return Result of the addition.
+ */
+ public static Set<ClusterNode> union(Set<ClusterNode> op1, Set<ClusterNode> op2) {
+ var res = new HashSet<>(op1);
+
+ res.addAll(op2);
+
+ return res;
+ }
+
+ /**
+ * Returns an intersection of two set of nodes.
+ *
+ * @param op1 First operand.
+ * @param op2 Second operand.
+ * @return Result of the intersection.
+ */
+ public static Set<ClusterNode> intersect(Set<ClusterNode> op1, Set<ClusterNode> op2) {
+ return op1.stream().filter(op2::contains).collect(Collectors.toSet());
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index f694cc0ea8..1f111133a8 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -46,7 +46,9 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -248,6 +250,9 @@ public class TableManagerTest extends IgniteAbstractTest {
/** Before all test scenarios. */
@BeforeEach
void before() {
+ when(rm.messagingService()).thenReturn(mock(MessagingService.class));
+ when(rm.topologyService()).thenReturn(mock(TopologyService.class));
+
revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
function.apply(0L).join();
@@ -305,10 +310,10 @@ public class TableManagerTest extends IgniteAbstractTest {
var extConfCh = ((ExtendedTableChange) tableChange);
- ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
+ ArrayList<Set<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
for (int part = 0; part < PARTITIONS; part++) {
- assignment.add(new ArrayList<>(Collections.singleton(node)));
+ assignment.add(new HashSet<>(Collections.singleton(node)));
}
extConfCh.changeAssignments(ByteUtils.toBytes(assignment))
@@ -761,6 +766,7 @@ public class TableManagerTest extends IgniteAbstractTest {
*/
private TableManager createTableManager(CompletableFuture<TableManager> tblManagerFut, boolean waitingSqlSchema) {
TableManager tableManager = new TableManager(
+ "test",
revisionUpdater,
tblsCfg,
rm,
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
index 7a15981de5..02f47b9e29 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
@@ -53,7 +53,7 @@ class RebalanceRaftGroupEventsListenerTest {
void testOnReconfigurationErrorCalledFromResetWithNullStatus() throws Exception {
IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null, null, 0, busyLock, null, null));
+ RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null, null, 0, busyLock, null, null, null));
NodeImpl node = Mockito.mock(NodeImpl.class);