You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/11/03 06:39:02 UTC
(ignite-3) branch main updated: IGNITE-20301 Fix flaky tests in ItIgniteInMemoryNodeRestartTest (#2778)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c15f14f4dd IGNITE-20301 Fix flaky tests in ItIgniteInMemoryNodeRestartTest (#2778)
c15f14f4dd is described below
commit c15f14f4ddca7d6e9ab3e02d5a661e68d6e9ee21
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Fri Nov 3 10:38:56 2023 +0400
IGNITE-20301 Fix flaky tests in ItIgniteInMemoryNodeRestartTest (#2778)
---
.../rebalance/ItRebalanceDistributedTest.java | 1 -
.../app/ItIgniteInMemoryNodeRestartTest.java | 47 +++++-----
.../runner/app/ItIgniteNodeRestartTest.java | 1 -
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 -
.../PartitionReplicatorNodeRecovery.java | 102 ++++++++++++++++++++-
.../internal/table/distributed/TableManager.java | 4 +-
.../table/distributed/TableManagerTest.java | 1 -
7 files changed, 126 insertions(+), 31 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index bca46ab13c..64dca921ee 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1012,7 +1012,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
replicaManager,
Mockito.mock(LockManager.class),
replicaSvc,
- clusterService.topologyService(),
txManager,
dataStorageMgr,
storagePath,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 068aad1332..b82b0e726e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -160,7 +160,6 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
* Restarts an in-memory node that is not a leader of the table's partition.
*/
@Test
- @Disabled("IGNITE-20301")
public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws Exception {
// Start three nodes, the first one is going to be CMG and MetaStorage leader.
IgniteImpl ignite = startNode(testInfo, 0);
@@ -177,16 +176,22 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
LeaderWithTerm leaderWithTerm = raftGroupService.refreshAndGetLeaderWithTerm().join();
String leaderId = leaderWithTerm.leader().consistentId();
+ log.info("Leader is {}", leaderId);
+
// Find the index of any node that is not a leader of the partition group.
int idxToStop = IntStream.range(1, 3)
.filter(idx -> !leaderId.equals(ignite(idx).node().name()))
.findFirst().getAsInt();
+ log.info("Stopping node {}", idxToStop);
+
// Restart the node.
stopNode(idxToStop);
IgniteImpl restartingNode = startNode(testInfo, idxToStop);
+ log.info("Restarted node {}", restartingNode.name());
+
Loza loza = restartingNode.raftManager();
String restartingNodeConsistentId = restartingNode.name();
@@ -195,31 +200,32 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
InternalTableImpl internalTable = (InternalTableImpl) restartingTable.internalTable();
// Check that it restarts.
- assertTrue(waitForCondition(
- () -> {
- boolean raftNodeStarted = loza.localNodes().stream().anyMatch(nodeId -> {
- if (nodeId.groupId() instanceof TablePartitionId) {
- return ((TablePartitionId) nodeId.groupId()).tableId() == table.tableId();
- }
+ waitForCondition(
+ () -> isRaftNodeStarted(table, loza) && solePartitionAssignmentsContain(restartingNodeConsistentId, internalTable),
+ TimeUnit.SECONDS.toMillis(10)
+ );
- return false;
- });
+ assertTrue(isRaftNodeStarted(table, loza), "Raft node of the partition is not started on " + restartingNodeConsistentId);
+ assertTrue(
+ solePartitionAssignmentsContain(restartingNodeConsistentId, internalTable),
+ "Assignments do not contain node " + restartingNodeConsistentId
+ );
- if (!raftNodeStarted) {
- return false;
- }
+ // Check the data rebalanced correctly.
+ checkTableWithData(restartingNode, TABLE_NAME);
+ }
- Map<Integer, List<String>> assignments = internalTable.peersAndLearners();
+ private static boolean solePartitionAssignmentsContain(String restartingNodeConsistentId, InternalTableImpl internalTable) {
+ Map<Integer, List<String>> assignments = internalTable.peersAndLearners();
- List<String> partitionAssignments = assignments.get(0);
+ List<String> partitionAssignments = assignments.get(0);
- return partitionAssignments.contains(restartingNodeConsistentId);
- },
- TimeUnit.SECONDS.toMillis(10)
- ));
+ return partitionAssignments.contains(restartingNodeConsistentId);
+ }
- // Check the data rebalanced correctly.
- checkTableWithData(restartingNode, TABLE_NAME);
+ private static boolean isRaftNodeStarted(TableImpl table, Loza loza) {
+ return loza.localNodes().stream().anyMatch(nodeId ->
+ nodeId.groupId() instanceof TablePartitionId && ((TablePartitionId) nodeId.groupId()).tableId() == table.tableId());
}
/**
@@ -266,7 +272,6 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
* Restarts all the nodes with the partition.
*/
@Test
- @Disabled("IGNITE-20301")
public void inMemoryNodeFullPartitionRestart(TestInfo testInfo) throws Exception {
// Start three nodes, the first one is going to be CMG and MetaStorage leader.
IgniteImpl ignite0 = startNode(testInfo, 0);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1eecdd54fc..cab0fa74c3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -396,7 +396,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
replicaMgr,
lockManager,
replicaService,
- clusterSvc.topologyService(),
txManager,
dataStorageManager,
storagePath,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fc67ac5fed..0f4e622b8e 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -584,7 +584,6 @@ public class IgniteImpl implements Ignite {
replicaMgr,
lockMgr,
replicaSvc,
- clusterSvc.topologyService(),
txManager,
dataStorageMgr,
storagePath,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
index b432cdd4c4..c33ea46d46 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -19,12 +19,18 @@ package org.apache.ignite.internal.table.distributed;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toSet;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntFunction;
import org.apache.ignite.internal.affinity.Assignment;
@@ -43,13 +49,17 @@ import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
import org.apache.ignite.internal.utils.RebalanceUtil;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.TopologyService;
/**
* Code specific to recovering a partition replicator group node. This includes a case when we lost metadata
* that is required for the replication protocol (for instance, for RAFT it's about group metadata).
*/
class PartitionReplicatorNodeRecovery {
- private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = TimeUnit.SECONDS.toMillis(3);
+ private static final long QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3);
+
+ private static final long PEERS_IN_TOPOLOGY_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3);
private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
@@ -57,6 +67,8 @@ class PartitionReplicatorNodeRecovery {
private final MessagingService messagingService;
+ private final TopologyService topologyService;
+
/** Resolver that resolves a node consistent ID to cluster node. */
private final Function<String, ClusterNode> clusterNodeResolver;
@@ -66,10 +78,13 @@ class PartitionReplicatorNodeRecovery {
PartitionReplicatorNodeRecovery(
MetaStorageManager metaStorageManager,
MessagingService messagingService,
+ TopologyService topologyService,
Function<String, ClusterNode> clusterNodeResolver,
- IntFunction<TableImpl> tableById) {
+ IntFunction<TableImpl> tableById
+ ) {
this.metaStorageManager = metaStorageManager;
this.messagingService = messagingService;
+ this.topologyService = topologyService;
this.clusterNodeResolver = clusterNodeResolver;
this.tableById = tableById;
}
@@ -152,7 +167,7 @@ class PartitionReplicatorNodeRecovery {
// No majority and not a full partition restart - need to 'remove, then add' nodes
// with current partition.
- return queryDataNodesCount(tableId, partId, newConfiguration.peers())
+ return waitForPeersAndQueryDataNodesCount(tableId, partId, newConfiguration.peers())
.thenApply(dataNodesCount -> {
boolean fullPartitionRestart = dataNodesCount == 0;
@@ -184,16 +199,93 @@ class PartitionReplicatorNodeRecovery {
* @param peers Raft peers.
* @return A future that will hold the quantity of data nodes.
*/
- private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId, Collection<Peer> peers) {
+ private CompletableFuture<Long> waitForPeersAndQueryDataNodesCount(int tblId, int partId, Collection<Peer> peers) {
HasDataRequest request = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
+ return allPeersAreInTopology(peers)
+ .thenCompose(unused -> queryDataNodesCount(peers, request));
+ }
+
+ private CompletableFuture<?> allPeersAreInTopology(Collection<Peer> peers) {
+ Set<String> peerConsistentIds = peers.stream()
+ .map(Peer::consistentId)
+ .collect(toSet());
+
+ Map<String, ClusterNode> peerNodesByConsistentIds = new ConcurrentHashMap<>();
+
+ for (Peer peer : peers) {
+ ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+
+ if (node != null) {
+ peerNodesByConsistentIds.put(peer.consistentId(), node);
+ }
+ }
+
+ if (peerNodesByConsistentIds.size() >= peers.size()) {
+ return completedFuture(null);
+ }
+
+ CompletableFuture<Void> allPeersAreSeenInTopology = new CompletableFuture<>();
+
+ TopologyEventHandler eventHandler = new TopologyEventHandler() {
+ @Override
+ public void onAppeared(ClusterNode member) {
+ if (peerConsistentIds.contains(member.name())) {
+ peerNodesByConsistentIds.put(member.name(), member);
+ }
+
+ if (peerNodesByConsistentIds.size() >= peers.size()) {
+ allPeersAreSeenInTopology.complete(null);
+ }
+ }
+ };
+
+ topologyService.addEventHandler(eventHandler);
+
+ // Check again for peers that could appear in the topology since last check, but before we installed the handler.
+ for (Peer peer : peers) {
+ if (!peerNodesByConsistentIds.containsKey(peer.consistentId())) {
+ ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+
+ if (node != null) {
+ peerNodesByConsistentIds.put(peer.consistentId(), node);
+ }
+ }
+ }
+
+ if (peerNodesByConsistentIds.size() >= peers.size()) {
+ return completedFuture(null);
+ }
+
+ // TODO: remove the handler after https://issues.apache.org/jira/browse/IGNITE-14519 is implemented.
+
+ return withTimeout(allPeersAreSeenInTopology);
+ }
+
+ private static CompletableFuture<Void> withTimeout(CompletableFuture<Void> future) {
+ return future.orTimeout(PEERS_IN_TOPOLOGY_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
+ .handle((res, ex) -> {
+ if (ex instanceof TimeoutException) {
+ return completedFuture(res);
+ }
+
+ if (ex != null) {
+ return CompletableFuture.<Void>failedFuture(ex);
+ }
+
+ return completedFuture(res);
+ })
+ .thenCompose(identity());
+ }
+
+ private CompletableFuture<Long> queryDataNodesCount(Collection<Peer> peers, HasDataRequest request) {
//noinspection unchecked
CompletableFuture<Boolean>[] requestFutures = peers.stream()
.map(Peer::consistentId)
.map(clusterNodeResolver)
.filter(Objects::nonNull)
.map(node -> messagingService
- .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
+ .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS)
.thenApply(response -> {
assert response instanceof HasDataResponse : response;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 0a4bad9ae4..c92d00647d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -374,7 +374,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
ReplicaManager replicaMgr,
LockManager lockMgr,
ReplicaService replicaSvc,
- TopologyService topologyService,
TxManager txManager,
DataStorageManager dataStorageMgr,
Path storagePath,
@@ -413,6 +412,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
this.observableTimestampTracker = observableTimestampTracker;
this.placementDriver = placementDriver;
+ TopologyService topologyService = clusterService.topologyService();
+
clusterNodeResolver = topologyService::getByConsistentId;
transactionStateResolver = new TransactionStateResolver(
@@ -478,6 +479,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
metaStorageMgr,
clusterService.messagingService(),
+ topologyService,
clusterNodeResolver,
tableId -> latestTablesById().get(tableId)
);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index ae65c20be9..27926c48b6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -698,7 +698,6 @@ public class TableManagerTest extends IgniteAbstractTest {
replicaMgr,
null,
null,
- ts,
tm,
dsm = createDataStorageManager(configRegistry, workDir, storageEngineConfig),
workDir,