You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/06/06 20:33:34 UTC
[cassandra] branch cassandra-4.1 updated: Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new d3ce825bf2 Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition
d3ce825bf2 is described below
commit d3ce825bf2b376fd2516e4b594ddb69037c13159
Author: David Capwell <dc...@apache.org>
AuthorDate: Fri Jun 3 14:37:39 2022 -0700
Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition
patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-17244
---
.../test/thresholds/TombstoneCountWarningTest.java | 113 +++++++++++++++++++++
1 file changed, 113 insertions(+)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java
index 04668a9589..5e409cbc31 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.distributed.test.thresholds;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableSet;
@@ -33,9 +35,17 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.SimpleStatement;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.concurrent.SEPExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.distributed.Cluster;
@@ -49,16 +59,22 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.TombstoneAbortException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.utils.Shared;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
+import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.assertj.core.api.Assertions.assertThat;
public class TombstoneCountWarningTest extends TestBaseImpl
{
+ private static final Logger logger = LoggerFactory.getLogger(TombstoneCountWarningTest.class);
+
private static final int TOMBSTONE_WARN = 50;
private static final int TOMBSTONE_FAIL = 100;
private static ICluster<IInvokableInstance> CLUSTER;
@@ -68,10 +84,12 @@ public class TombstoneCountWarningTest extends TestBaseImpl
@BeforeClass
public static void setupClass() throws IOException
{
+ logger.info("[test step : @BeforeClass] setupClass");
Cluster.Builder builder = Cluster.build(3);
builder.withConfig(c -> c.set("tombstone_warn_threshold", TOMBSTONE_WARN)
.set("tombstone_failure_threshold", TOMBSTONE_FAIL)
.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+ builder.withInstanceInitializer(BB::install);
CLUSTER = builder.start();
JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
@@ -80,6 +98,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl
@AfterClass
public static void teardown()
{
+ logger.info("[test step : @AfterClass] teardown");
if (JAVA_DRIVER_SESSION != null)
JAVA_DRIVER_SESSION.close();
if (JAVA_DRIVER != null)
@@ -89,6 +108,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl
@Before
public void setup()
{
+ logger.info("[test step : @Before] setup");
CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
init(CLUSTER);
CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
@@ -102,12 +122,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl
@Test
public void noWarningsSinglePartition()
{
+ logger.info("[test step : @Test] noWarningsSinglePartition");
noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
}
@Test
public void noWarningsScan()
{
+ logger.info("[test step : @Test] noWarningsScan");
noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
}
@@ -134,12 +156,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl
@Test
public void warnThresholdSinglePartition()
{
+ logger.info("[test step : @Test] warnThresholdSinglePartition");
warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
}
@Test
public void warnThresholdScan()
{
+ logger.info("[test step : @Test] warnThresholdScan");
warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
}
@@ -183,12 +207,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl
@Test
public void failThresholdSinglePartition() throws UnknownHostException
{
+ logger.info("[test step : @Test] failThresholdSinglePartition");
failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
}
@Test
public void failThresholdScan() throws UnknownHostException
{
+ logger.info("[test step : @Test] failThresholdScan");
failThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
}
@@ -248,7 +274,10 @@ public class TombstoneCountWarningTest extends TestBaseImpl
assertWarnAborts(0, 2, 1);
+ // when disabled warnings only happen if on the coordinator, and coordinator may not be the one replying
+ // to every query
enable(false);
+ State.blockFor(CLUSTER.get(1).config().broadcastAddress());
warnings = CLUSTER.get(1).callsOnInstance(() -> {
ClientWarn.instance.captureWarnings();
try
@@ -259,7 +288,12 @@ public class TombstoneCountWarningTest extends TestBaseImpl
catch (ReadFailureException e)
{
Assertions.assertThat(e).isNotInstanceOf(TombstoneAbortException.class);
+ Assertions.assertThat(e.failureReasonByEndpoint).isNotEmpty();
+ Assertions.assertThat(e.failureReasonByEndpoint.values())
+ .as("Non READ_TOO_MANY_TOMBSTONES exists")
+ .allMatch(RequestFailureReason.READ_TOO_MANY_TOMBSTONES::equals);
}
+ logger.warn("Checking warnings...");
return ClientWarn.instance.getWarnings();
}).call();
// client warnings are currently coordinator only, so if present only 1 is expected
@@ -276,6 +310,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl
assertWarnAborts(0, 2, 0);
+ State.blockFor(CLUSTER.get(1).config().broadcastAddress());
try
{
driverQueryAll(cql);
@@ -327,4 +362,82 @@ public class TombstoneCountWarningTest extends TestBaseImpl
{
return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
}
+
+ @Shared
+ public static class State
+ {
+ // use InetSocketAddress as InetAddressAndPort is @Isolated which means equality doesn't work due to different
+ // ClassLoaders; InetSocketAddress is @Shared so safe to use between app and cluster class loaders
+ public static volatile InetSocketAddress blockFor = null;
+ public static volatile CompletableFuture<Void> promise = null;
+
+ // called on main thread
+ public static void blockFor(InetSocketAddress address)
+ {
+ blockFor = address;
+ promise = new CompletableFuture<>();
+ }
+
+ // called in C* threads; non-test threads
+ public static void onFailure(InetSocketAddress address)
+ {
+ if (address.equals(blockFor))
+ promise.complete(null);
+ }
+
+ // called on main thread
+ public static void syncAndClear()
+ {
+ if (blockFor != null)
+ {
+ promise.join();
+ blockFor = null;
+ promise = null;
+ }
+ }
+ }
+
+ public static class BB
+ {
+ private static void install(ClassLoader cl, int instanceId)
+ {
+ if (instanceId != 1)
+ return;
+ new ByteBuddy().rebase(ReadCallback.class)
+ .method(named("awaitResults"))
+ .intercept(MethodDelegation.to(BB.class))
+ .method(named("onFailure"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ new ByteBuddy().rebase(SEPExecutor.class)
+ .method(named("maybeExecuteImmediately"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ @SuppressWarnings("unused")
+ public static void awaitResults(@SuperCall Runnable zuper)
+ {
+ State.syncAndClear();
+ zuper.run();
+ }
+
+ @SuppressWarnings("unused")
+ public static void onFailure(InetAddressAndPort from, RequestFailureReason failureReason, @SuperCall Runnable zuper) throws Exception
+ {
+ State.onFailure(new InetSocketAddress(from.getAddress(), from.getPort()));
+ zuper.run();
+ }
+
+ // make sure to schedule the task rather than running inline...
+ // this is imporant as the read may block on the local version which can get the test to include it rather than
+ // block waiting, so by scheduling we make sure its always fair
+ @SuppressWarnings("unused")
+ public static void maybeExecuteImmediately(Runnable task, @This SEPExecutor executor)
+ {
+ executor.execute(task);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org