You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2022/09/12 17:08:56 UTC

[cassandra] branch cassandra-4.0 updated: Prevent flakiness in MixedModeMessageForwardTest

This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new f65f123780 Prevent flakiness in MixedModeMessageForwardTest
f65f123780 is described below

commit f65f1237800b5e884309bbb1bb69d1302466ee80
Author: Josh McKenzie <jm...@apache.org>
AuthorDate: Wed Sep 7 13:55:54 2022 -0400

    Prevent flakiness in MixedModeMessageForwardTest
    
    Patch by Jon Meredith; reviewed by Josh McKenzie for CASSANDRA-17866
---
 .../upgrade/MixedModeMessageForwardTest.java       | 43 ++++++++++++++++------
 1 file changed, 32 insertions(+), 11 deletions(-)

diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
index 78e22195f0..29c3209a6f 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.upgrade;
 
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -28,7 +29,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.shared.Shared;
+import org.awaitility.Awaitility;
 
 import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 
@@ -41,23 +44,40 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
     private static final String INSERT_QUERY = String.format("INSERT INTO %s.%s(pk) VALUES (?)", KEYSPACE, TABLE);
     private static final String CHECK_QUERY = String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, TABLE);
 
-    private void writeReadTest(UpgradeableCluster cluster)
+    static boolean checkSelectQueriesRespond(IUpgradeableInstance instance, int keyToInsert)
     {
-        // Coordinate a write from each node and then check present on all replicas
-        int readKey = nextKey;
-        for (int coordId = 1; coordId <= cluster.size(); coordId++)
+        try
         {
-            logger.info("Coordinating CL.ALL Insert from node{} ", coordId);
-            cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, nextKey++);
+            instance.coordinator().execute(CHECK_QUERY, ConsistencyLevel.ALL, 0);
+            return true;
         }
+        catch (Throwable tr)
+        {
+            logger.info("Select CL.ALL failed, retrying.", tr);
+            return false;
+        }
+    }
 
+    private void writeReadTest(UpgradeableCluster cluster)
+    {
+        // Coordinate a write from each node and then check present on all replicas
         for (int coordId = 1; coordId <= cluster.size(); coordId++)
         {
+            final IUpgradeableInstance instance = cluster.get(coordId);
+            final int keyToInsert = nextKey++;
+
+            // Wait for the messaging service to be connected for up to a minute by issuing
+            // a CL.ALL read that requires a connection to all other instances
+            Awaitility.await("MessagingService ready CL.ALL select from node" + coordId)
+                      .atMost(1, TimeUnit.MINUTES)
+                      .until(() -> checkSelectQueriesRespond(instance, keyToInsert));
+
+            cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, keyToInsert);
+
             for (int nodeId = 1; nodeId <= cluster.size(); nodeId++) {
-                Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, readKey);
-                assertRows(results, row(readKey));
+                Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, keyToInsert);
+                assertRows(results, row(keyToInsert));
             }
-            readKey++;
         }
     }
 
@@ -68,7 +88,8 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
      *    configured to write to all replicas in the datacenter
      * 2) check the original single-version cluster by issuing an INSERT
      *    mutation from a coordinator on each node, then check that value
-     *    has locally been written to each of the nodes.
+     *    has locally been written to each of the nodes. INSERT happens until ALL nodes
+     *    respond to give time for internode messaging to be established after upgrade.
      * 3) Upgrade nodes one at a time, rechecking that all writes are forwarded.
      */
     @Test
@@ -81,7 +102,7 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
                                   .collect(Collectors.joining(","));
 
         new TestCase()
-        .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
+        .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
         .withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
         .nodes(numDCs * nodesPerDc)
         .singleUpgrade(v30, v40)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org