You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/11/19 20:38:51 UTC

[cassandra] branch trunk updated: Make waiting for session event persistence more reliable in SecondaryIndexTest#test_only_coordinator_chooses_index_for_query

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c15f530  Make waiting for session event persistence more reliable in SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
c15f530 is described below

commit c15f530b63a1cd4d5b2835bb418197145beb7bb6
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Fri Nov 19 12:58:02 2021 -0600

    Make waiting for session event persistence more reliable in SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
    
    patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-17165
---
 .../distributed/test/SecondaryIndexTest.java       | 58 ++++++++++++++--------
 1 file changed, 37 insertions(+), 21 deletions(-)

diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
index 7b1c4ad..b530dcc 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
@@ -20,14 +20,16 @@ package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -43,9 +45,10 @@ public class SecondaryIndexTest extends TestBaseImpl
     private static final int NUM_NODES = 3;
     private static final int REPLICATION_FACTOR = 1;
     private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, PRIMARY KEY (k))";
-    private static final String CREATE_INDEX = "CREATE INDEX v_index ON %s(v)";
+    private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON %s(v)";
 
     private static final AtomicInteger seq = new AtomicInteger();
+    
     private static String tableName;
     private static Cluster cluster;
 
@@ -65,10 +68,9 @@ public class SecondaryIndexTest extends TestBaseImpl
     @Before
     public void before()
     {
-        // create the table
         tableName = String.format("%s.t_%d", KEYSPACE, seq.getAndIncrement());
         cluster.schemaChange(String.format(CREATE_TABLE, tableName));
-        cluster.schemaChange(String.format(CREATE_INDEX, tableName));
+        cluster.schemaChange(String.format(CREATE_INDEX, seq.get(), tableName));
     }
 
     @After
@@ -78,31 +80,45 @@ public class SecondaryIndexTest extends TestBaseImpl
     }
 
     @Test
-    public void test_only_coordinator_chooses_index_for_query() throws InterruptedException, UnknownHostException
+    public void test_only_coordinator_chooses_index_for_query()
     {
         for (int i = 0 ; i < 99 ; ++i)
             cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, i/3);
         cluster.forEach(i -> i.flush(KEYSPACE));
 
-        for (int i = 0 ; i < 33 ; ++i)
+        Pattern indexScanningPattern =
+                Pattern.compile(String.format("Index mean cardinalities are v_index_%d:[0-9]+. Scanning with v_index_%d.", seq.get(), seq.get()));
+
+        for (int i = 0 ; i < 33; ++i)
         {
             UUID trace = UUID.randomUUID();
             Object[][] result = cluster.coordinator(1).executeWithTracing(trace, String.format("SELECT * FROM %s WHERE v = ?", tableName), ConsistencyLevel.ALL, i);
-            Assert.assertEquals(3, result.length);
-            Thread.sleep(100L);
-            Object[][] traces = cluster.coordinator(1).execute(String.format("SELECT source, activity FROM system_traces.events WHERE session_id = ?", tableName), ConsistencyLevel.ALL, trace);
-            List<InetAddress> scanning = Arrays.stream(traces)
-                                               .filter(t -> t[1].toString().matches("Index mean cardinalities are v_index:[0-9]+. Scanning with v_index."))
-                                               .map(t -> (InetAddress) t[0])
-                                               .distinct().collect(Collectors.toList());
-
-            List<InetAddress> executing = Arrays.stream(traces)
-                                                .filter(t -> t[1].toString().equals("Executing read on " + tableName + " using index v_index"))
-                                                .map(t -> (InetAddress) t[0])
-                                                .distinct().collect(Collectors.toList());
-
-            Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()), scanning);
-            Assert.assertEquals(3, executing.size());
+            Assert.assertEquals("Failed on iteration " + i, 3, result.length);
+
+            Awaitility.await("For all events in the tracing session to persist")
+                    .pollInterval(100, TimeUnit.MILLISECONDS)
+                    .atMost(10, TimeUnit.SECONDS)
+                    .untilAsserted(() -> 
+                                   {
+                                       Object[][] traces = cluster.coordinator(1)
+                                                                  .execute("SELECT source, activity FROM system_traces.events WHERE session_id = ?", 
+                                                                           ConsistencyLevel.ALL, trace);
+
+                                       List<InetAddress> scanning =
+                                               Arrays.stream(traces)
+                                                     .filter(t -> indexScanningPattern.matcher(t[1].toString()).matches())
+                                                     .map(t -> (InetAddress) t[0])
+                                                     .distinct().collect(Collectors.toList());
+
+                                       List<InetAddress> executing =
+                                               Arrays.stream(traces)
+                                                     .filter(t -> t[1].toString().equals(String.format("Executing read on " + tableName + " using index v_index_%d", seq.get())))
+                                                     .map(t -> (InetAddress) t[0])
+                                                     .distinct().collect(Collectors.toList());
+
+                                       Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()), scanning);
+                                       Assert.assertEquals(3, executing.size());
+                                   });
         }
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org