You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2022/09/12 17:08:56 UTC
[cassandra] branch cassandra-4.0 updated: Prevent flakiness in MixedModeMessageForwardTest
This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new f65f123780 Prevent flakiness in MixedModeMessageForwardTest
f65f123780 is described below
commit f65f1237800b5e884309bbb1bb69d1302466ee80
Author: Josh McKenzie <jm...@apache.org>
AuthorDate: Wed Sep 7 13:55:54 2022 -0400
Prevent flakiness in MixedModeMessageForwardTest
Patch by Jon Meredith; reviewed by Josh McKenzie for CASSANDRA-17866
---
.../upgrade/MixedModeMessageForwardTest.java | 43 ++++++++++++++++------
1 file changed, 32 insertions(+), 11 deletions(-)
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
index 78e22195f0..29c3209a6f 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.distributed.upgrade;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -28,7 +29,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.shared.Shared;
+import org.awaitility.Awaitility;
import static org.apache.cassandra.distributed.shared.AssertUtils.*;
@@ -41,23 +44,40 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
private static final String INSERT_QUERY = String.format("INSERT INTO %s.%s(pk) VALUES (?)", KEYSPACE, TABLE);
private static final String CHECK_QUERY = String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, TABLE);
- private void writeReadTest(UpgradeableCluster cluster)
+ static boolean checkSelectQueriesRespond(IUpgradeableInstance instance, int keyToInsert)
{
- // Coordinate a write from each node and then check present on all replicas
- int readKey = nextKey;
- for (int coordId = 1; coordId <= cluster.size(); coordId++)
+ try
{
- logger.info("Coordinating CL.ALL Insert from node{} ", coordId);
- cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, nextKey++);
+ instance.coordinator().execute(CHECK_QUERY, ConsistencyLevel.ALL, 0);
+ return true;
}
+ catch (Throwable tr)
+ {
+ logger.info("Select CL.ALL failed, retrying.", tr);
+ return false;
+ }
+ }
+ private void writeReadTest(UpgradeableCluster cluster)
+ {
+ // Coordinate a write from each node and then check present on all replicas
for (int coordId = 1; coordId <= cluster.size(); coordId++)
{
+ final IUpgradeableInstance instance = cluster.get(coordId);
+ final int keyToInsert = nextKey++;
+
+ // Wait for the messaging service to be connected for up to a minute by issuing
+ // a CL.ALL read that requires a connection to all other instances
+ Awaitility.await("MessagingService ready CL.ALL select from node" + coordId)
+ .atMost(1, TimeUnit.MINUTES)
+ .until(() -> checkSelectQueriesRespond(instance, keyToInsert));
+
+ cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, keyToInsert);
+
for (int nodeId = 1; nodeId <= cluster.size(); nodeId++) {
- Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, readKey);
- assertRows(results, row(readKey));
+ Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, keyToInsert);
+ assertRows(results, row(keyToInsert));
}
- readKey++;
}
}
@@ -68,7 +88,8 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
* configured to write to all replicas in the datacenter
* 2) check the original single-version cluster by issuing an INSERT
* mutation from a coordinator on each node, then check that value
- * has locally been written to each of the nodes.
+ * has locally been written to each of the nodes. INSERT happens until ALL nodes
+ * respond to give time for internode messaging to be established after upgrade.
* 3) Upgrade nodes one at a time, rechecking that all writes are forwarded.
*/
@Test
@@ -81,7 +102,7 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
.collect(Collectors.joining(","));
new TestCase()
- .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
+ .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
.withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
.nodes(numDCs * nodesPerDc)
.singleUpgrade(v30, v40)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org