You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/11/19 20:38:51 UTC
[cassandra] branch trunk updated: Make waiting for session event persistence more reliable in SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c15f530 Make waiting for session event persistence more reliable in SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
c15f530 is described below
commit c15f530b63a1cd4d5b2835bb418197145beb7bb6
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Fri Nov 19 12:58:02 2021 -0600
Make waiting for session event persistence more reliable in SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-17165
---
.../distributed/test/SecondaryIndexTest.java | 58 ++++++++++++++--------
1 file changed, 37 insertions(+), 21 deletions(-)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
index 7b1c4ad..b530dcc 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
@@ -20,14 +20,16 @@ package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -43,9 +45,10 @@ public class SecondaryIndexTest extends TestBaseImpl
private static final int NUM_NODES = 3;
private static final int REPLICATION_FACTOR = 1;
private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, PRIMARY KEY (k))";
- private static final String CREATE_INDEX = "CREATE INDEX v_index ON %s(v)";
+ private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON %s(v)";
private static final AtomicInteger seq = new AtomicInteger();
+
private static String tableName;
private static Cluster cluster;
@@ -65,10 +68,9 @@ public class SecondaryIndexTest extends TestBaseImpl
@Before
public void before()
{
- // create the table
tableName = String.format("%s.t_%d", KEYSPACE, seq.getAndIncrement());
cluster.schemaChange(String.format(CREATE_TABLE, tableName));
- cluster.schemaChange(String.format(CREATE_INDEX, tableName));
+ cluster.schemaChange(String.format(CREATE_INDEX, seq.get(), tableName));
}
@After
@@ -78,31 +80,45 @@ public class SecondaryIndexTest extends TestBaseImpl
}
@Test
- public void test_only_coordinator_chooses_index_for_query() throws InterruptedException, UnknownHostException
+ public void test_only_coordinator_chooses_index_for_query()
{
for (int i = 0 ; i < 99 ; ++i)
cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, i/3);
cluster.forEach(i -> i.flush(KEYSPACE));
- for (int i = 0 ; i < 33 ; ++i)
+ Pattern indexScanningPattern =
+ Pattern.compile(String.format("Index mean cardinalities are v_index_%d:[0-9]+. Scanning with v_index_%d.", seq.get(), seq.get()));
+
+ for (int i = 0 ; i < 33; ++i)
{
UUID trace = UUID.randomUUID();
Object[][] result = cluster.coordinator(1).executeWithTracing(trace, String.format("SELECT * FROM %s WHERE v = ?", tableName), ConsistencyLevel.ALL, i);
- Assert.assertEquals(3, result.length);
- Thread.sleep(100L);
- Object[][] traces = cluster.coordinator(1).execute(String.format("SELECT source, activity FROM system_traces.events WHERE session_id = ?", tableName), ConsistencyLevel.ALL, trace);
- List<InetAddress> scanning = Arrays.stream(traces)
- .filter(t -> t[1].toString().matches("Index mean cardinalities are v_index:[0-9]+. Scanning with v_index."))
- .map(t -> (InetAddress) t[0])
- .distinct().collect(Collectors.toList());
-
- List<InetAddress> executing = Arrays.stream(traces)
- .filter(t -> t[1].toString().equals("Executing read on " + tableName + " using index v_index"))
- .map(t -> (InetAddress) t[0])
- .distinct().collect(Collectors.toList());
-
- Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()), scanning);
- Assert.assertEquals(3, executing.size());
+ Assert.assertEquals("Failed on iteration " + i, 3, result.length);
+
+ Awaitility.await("For all events in the tracing session to persist")
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
+ {
+ Object[][] traces = cluster.coordinator(1)
+ .execute("SELECT source, activity FROM system_traces.events WHERE session_id = ?",
+ ConsistencyLevel.ALL, trace);
+
+ List<InetAddress> scanning =
+ Arrays.stream(traces)
+ .filter(t -> indexScanningPattern.matcher(t[1].toString()).matches())
+ .map(t -> (InetAddress) t[0])
+ .distinct().collect(Collectors.toList());
+
+ List<InetAddress> executing =
+ Arrays.stream(traces)
+ .filter(t -> t[1].toString().equals(String.format("Executing read on " + tableName + " using index v_index_%d", seq.get())))
+ .map(t -> (InetAddress) t[0])
+ .distinct().collect(Collectors.toList());
+
+ Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()), scanning);
+ Assert.assertEquals(3, executing.size());
+ });
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org