You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/11/03 06:39:02 UTC

(ignite-3) branch main updated: IGNITE-20301 Fix flaky tests in ItIgniteInMemoryNodeRestartTest (#2778)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c15f14f4dd IGNITE-20301 Fix flaky tests in ItIgniteInMemoryNodeRestartTest (#2778)
c15f14f4dd is described below

commit c15f14f4ddca7d6e9ab3e02d5a661e68d6e9ee21
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Fri Nov 3 10:38:56 2023 +0400

    IGNITE-20301 Fix flaky tests in ItIgniteInMemoryNodeRestartTest (#2778)
---
 .../rebalance/ItRebalanceDistributedTest.java      |   1 -
 .../app/ItIgniteInMemoryNodeRestartTest.java       |  47 +++++-----
 .../runner/app/ItIgniteNodeRestartTest.java        |   1 -
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 -
 .../PartitionReplicatorNodeRecovery.java           | 102 ++++++++++++++++++++-
 .../internal/table/distributed/TableManager.java   |   4 +-
 .../table/distributed/TableManagerTest.java        |   1 -
 7 files changed, 126 insertions(+), 31 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index bca46ab13c..64dca921ee 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1012,7 +1012,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
                     replicaManager,
                     Mockito.mock(LockManager.class),
                     replicaSvc,
-                    clusterService.topologyService(),
                     txManager,
                     dataStorageMgr,
                     storagePath,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 068aad1332..b82b0e726e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -160,7 +160,6 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
      * Restarts an in-memory node that is not a leader of the table's partition.
      */
     @Test
-    @Disabled("IGNITE-20301")
     public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws Exception {
         // Start three nodes, the first one is going to be CMG and MetaStorage leader.
         IgniteImpl ignite = startNode(testInfo, 0);
@@ -177,16 +176,22 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
         LeaderWithTerm leaderWithTerm = raftGroupService.refreshAndGetLeaderWithTerm().join();
         String leaderId = leaderWithTerm.leader().consistentId();
 
+        log.info("Leader is {}", leaderId);
+
         // Find the index of any node that is not a leader of the partition group.
         int idxToStop = IntStream.range(1, 3)
                 .filter(idx -> !leaderId.equals(ignite(idx).node().name()))
                 .findFirst().getAsInt();
 
+        log.info("Stopping node {}", idxToStop);
+
         // Restart the node.
         stopNode(idxToStop);
 
         IgniteImpl restartingNode = startNode(testInfo, idxToStop);
 
+        log.info("Restarted node {}", restartingNode.name());
+
         Loza loza = restartingNode.raftManager();
 
         String restartingNodeConsistentId = restartingNode.name();
@@ -195,31 +200,32 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
         InternalTableImpl internalTable = (InternalTableImpl) restartingTable.internalTable();
 
         // Check that it restarts.
-        assertTrue(waitForCondition(
-                () -> {
-                    boolean raftNodeStarted = loza.localNodes().stream().anyMatch(nodeId -> {
-                        if (nodeId.groupId() instanceof TablePartitionId) {
-                            return ((TablePartitionId) nodeId.groupId()).tableId() == table.tableId();
-                        }
+        waitForCondition(
+                () -> isRaftNodeStarted(table, loza) && solePartitionAssignmentsContain(restartingNodeConsistentId, internalTable),
+                TimeUnit.SECONDS.toMillis(10)
+        );
 
-                        return false;
-                    });
+        assertTrue(isRaftNodeStarted(table, loza), "Raft node of the partition is not started on " + restartingNodeConsistentId);
+        assertTrue(
+                solePartitionAssignmentsContain(restartingNodeConsistentId, internalTable),
+                "Assignments do not contain node " + restartingNodeConsistentId
+        );
 
-                    if (!raftNodeStarted) {
-                        return false;
-                    }
+        // Check the data rebalanced correctly.
+        checkTableWithData(restartingNode, TABLE_NAME);
+    }
 
-                    Map<Integer, List<String>> assignments = internalTable.peersAndLearners();
+    private static boolean solePartitionAssignmentsContain(String restartingNodeConsistentId, InternalTableImpl internalTable) {
+        Map<Integer, List<String>> assignments = internalTable.peersAndLearners();
 
-                    List<String> partitionAssignments = assignments.get(0);
+        List<String> partitionAssignments = assignments.get(0);
 
-                    return partitionAssignments.contains(restartingNodeConsistentId);
-                },
-                TimeUnit.SECONDS.toMillis(10)
-        ));
+        return partitionAssignments.contains(restartingNodeConsistentId);
+    }
 
-        // Check the data rebalanced correctly.
-        checkTableWithData(restartingNode, TABLE_NAME);
+    private static boolean isRaftNodeStarted(TableImpl table, Loza loza) {
+        return loza.localNodes().stream().anyMatch(nodeId ->
+                nodeId.groupId() instanceof TablePartitionId && ((TablePartitionId) nodeId.groupId()).tableId() == table.tableId());
     }
 
     /**
@@ -266,7 +272,6 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
      * Restarts all the nodes with the partition.
      */
     @Test
-    @Disabled("IGNITE-20301")
     public void inMemoryNodeFullPartitionRestart(TestInfo testInfo) throws Exception {
         // Start three nodes, the first one is going to be CMG and MetaStorage leader.
         IgniteImpl ignite0 = startNode(testInfo, 0);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1eecdd54fc..cab0fa74c3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -396,7 +396,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
                 replicaMgr,
                 lockManager,
                 replicaService,
-                clusterSvc.topologyService(),
                 txManager,
                 dataStorageManager,
                 storagePath,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fc67ac5fed..0f4e622b8e 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -584,7 +584,6 @@ public class IgniteImpl implements Ignite {
                 replicaMgr,
                 lockMgr,
                 replicaSvc,
-                clusterSvc.topologyService(),
                 txManager,
                 dataStorageMgr,
                 storagePath,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
index b432cdd4c4..c33ea46d46 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -19,12 +19,18 @@ package org.apache.ignite.internal.table.distributed;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toSet;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.function.IntFunction;
 import org.apache.ignite.internal.affinity.Assignment;
@@ -43,13 +49,17 @@ import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import org.apache.ignite.internal.utils.RebalanceUtil;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.TopologyService;
 
 /**
  * Code specific to recovering a partition replicator group node. This includes a case when we lost metadata
  * that is required for the replication protocol (for instance, for RAFT it's about group metadata).
  */
 class PartitionReplicatorNodeRecovery {
-    private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = TimeUnit.SECONDS.toMillis(3);
+    private static final long QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3);
+
+    private static final long PEERS_IN_TOPOLOGY_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3);
 
     private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
 
@@ -57,6 +67,8 @@ class PartitionReplicatorNodeRecovery {
 
     private final MessagingService messagingService;
 
+    private final TopologyService topologyService;
+
     /** Resolver that resolves a node consistent ID to cluster node. */
     private final Function<String, ClusterNode> clusterNodeResolver;
 
@@ -66,10 +78,13 @@ class PartitionReplicatorNodeRecovery {
     PartitionReplicatorNodeRecovery(
             MetaStorageManager metaStorageManager,
             MessagingService messagingService,
+            TopologyService topologyService,
             Function<String, ClusterNode> clusterNodeResolver,
-            IntFunction<TableImpl> tableById) {
+            IntFunction<TableImpl> tableById
+    ) {
         this.metaStorageManager = metaStorageManager;
         this.messagingService = messagingService;
+        this.topologyService = topologyService;
         this.clusterNodeResolver = clusterNodeResolver;
         this.tableById = tableById;
     }
@@ -152,7 +167,7 @@ class PartitionReplicatorNodeRecovery {
 
         // No majority and not a full partition restart - need to 'remove, then add' nodes
         // with current partition.
-        return queryDataNodesCount(tableId, partId, newConfiguration.peers())
+        return waitForPeersAndQueryDataNodesCount(tableId, partId, newConfiguration.peers())
                 .thenApply(dataNodesCount -> {
                     boolean fullPartitionRestart = dataNodesCount == 0;
 
@@ -184,16 +199,93 @@ class PartitionReplicatorNodeRecovery {
      * @param peers Raft peers.
      * @return A future that will hold the quantity of data nodes.
      */
-    private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId, Collection<Peer> peers) {
+    private CompletableFuture<Long> waitForPeersAndQueryDataNodesCount(int tblId, int partId, Collection<Peer> peers) {
         HasDataRequest request = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
 
+        return allPeersAreInTopology(peers)
+                .thenCompose(unused -> queryDataNodesCount(peers, request));
+    }
+
+    private CompletableFuture<?> allPeersAreInTopology(Collection<Peer> peers) {
+        Set<String> peerConsistentIds = peers.stream()
+                .map(Peer::consistentId)
+                .collect(toSet());
+
+        Map<String, ClusterNode> peerNodesByConsistentIds = new ConcurrentHashMap<>();
+
+        for (Peer peer : peers) {
+            ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+
+            if (node != null) {
+                peerNodesByConsistentIds.put(peer.consistentId(), node);
+            }
+        }
+
+        if (peerNodesByConsistentIds.size() >= peers.size()) {
+            return completedFuture(null);
+        }
+
+        CompletableFuture<Void> allPeersAreSeenInTopology = new CompletableFuture<>();
+
+        TopologyEventHandler eventHandler = new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                if (peerConsistentIds.contains(member.name())) {
+                    peerNodesByConsistentIds.put(member.name(), member);
+                }
+
+                if (peerNodesByConsistentIds.size() >= peers.size()) {
+                    allPeersAreSeenInTopology.complete(null);
+                }
+            }
+        };
+
+        topologyService.addEventHandler(eventHandler);
+
+        // Check again for peers that could appear in the topology since last check, but before we installed the handler.
+        for (Peer peer : peers) {
+            if (!peerNodesByConsistentIds.containsKey(peer.consistentId())) {
+                ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+
+                if (node != null) {
+                    peerNodesByConsistentIds.put(peer.consistentId(), node);
+                }
+            }
+        }
+
+        if (peerNodesByConsistentIds.size() >= peers.size()) {
+            return completedFuture(null);
+        }
+
+        // TODO: remove the handler after https://issues.apache.org/jira/browse/IGNITE-14519 is implemented.
+
+        return withTimeout(allPeersAreSeenInTopology);
+    }
+
+    private static CompletableFuture<Void> withTimeout(CompletableFuture<Void> future) {
+        return future.orTimeout(PEERS_IN_TOPOLOGY_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
+                .handle((res, ex) -> {
+                    if (ex instanceof TimeoutException) {
+                        return completedFuture(res);
+                    }
+
+                    if (ex != null) {
+                        return CompletableFuture.<Void>failedFuture(ex);
+                    }
+
+                    return completedFuture(res);
+                })
+                .thenCompose(identity());
+    }
+
+    private CompletableFuture<Long> queryDataNodesCount(Collection<Peer> peers, HasDataRequest request) {
         //noinspection unchecked
         CompletableFuture<Boolean>[] requestFutures = peers.stream()
                 .map(Peer::consistentId)
                 .map(clusterNodeResolver)
                 .filter(Objects::nonNull)
                 .map(node -> messagingService
-                        .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
+                        .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS)
                         .thenApply(response -> {
                             assert response instanceof HasDataResponse : response;
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 0a4bad9ae4..c92d00647d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -374,7 +374,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
             ReplicaManager replicaMgr,
             LockManager lockMgr,
             ReplicaService replicaSvc,
-            TopologyService topologyService,
             TxManager txManager,
             DataStorageManager dataStorageMgr,
             Path storagePath,
@@ -413,6 +412,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         this.observableTimestampTracker = observableTimestampTracker;
         this.placementDriver = placementDriver;
 
+        TopologyService topologyService = clusterService.topologyService();
+
         clusterNodeResolver = topologyService::getByConsistentId;
 
         transactionStateResolver = new TransactionStateResolver(
@@ -478,6 +479,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
                 metaStorageMgr,
                 clusterService.messagingService(),
+                topologyService,
                 clusterNodeResolver,
                 tableId -> latestTablesById().get(tableId)
         );
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index ae65c20be9..27926c48b6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -698,7 +698,6 @@ public class TableManagerTest extends IgniteAbstractTest {
                 replicaMgr,
                 null,
                 null,
-                ts,
                 tm,
                 dsm = createDataStorageManager(configRegistry, workDir, storageEngineConfig),
                 workDir,