You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/06/06 20:33:34 UTC

[cassandra] branch cassandra-4.1 updated: Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new d3ce825bf2 Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition
d3ce825bf2 is described below

commit d3ce825bf2b376fd2516e4b594ddb69037c13159
Author: David Capwell <dc...@apache.org>
AuthorDate: Fri Jun 3 14:37:39 2022 -0700

    Fix org.apache.cassandra.distributed.test.trackwarnings.TombstoneCountWarningTest.failThresholdSinglePartition
    
    patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-17244
---
 .../test/thresholds/TombstoneCountWarningTest.java | 113 +++++++++++++++++++++
 1 file changed, 113 insertions(+)

diff --git a/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java
index 04668a9589..5e409cbc31 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/thresholds/TombstoneCountWarningTest.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.distributed.test.thresholds;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableSet;
@@ -33,9 +35,17 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.SimpleStatement;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.concurrent.SEPExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.distributed.Cluster;
@@ -49,16 +59,22 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.TombstoneAbortException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.ReadCallback;
 import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.utils.Shared;
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Condition;
 
+import static net.bytebuddy.matcher.ElementMatchers.named;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class TombstoneCountWarningTest extends TestBaseImpl
 {
+    private static final Logger logger = LoggerFactory.getLogger(TombstoneCountWarningTest.class);
+
     private static final int TOMBSTONE_WARN = 50;
     private static final int TOMBSTONE_FAIL = 100;
     private static ICluster<IInvokableInstance> CLUSTER;
@@ -68,10 +84,12 @@ public class TombstoneCountWarningTest extends TestBaseImpl
     @BeforeClass
     public static void setupClass() throws IOException
     {
+        logger.info("[test step : @BeforeClass] setupClass");
         Cluster.Builder builder = Cluster.build(3);
         builder.withConfig(c -> c.set("tombstone_warn_threshold", TOMBSTONE_WARN)
                                  .set("tombstone_failure_threshold", TOMBSTONE_FAIL)
                                  .with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+        builder.withInstanceInitializer(BB::install);
         CLUSTER = builder.start();
         JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
         JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
@@ -80,6 +98,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl
     @AfterClass
     public static void teardown()
     {
+        logger.info("[test step : @AfterClass] teardown");
         if (JAVA_DRIVER_SESSION != null)
             JAVA_DRIVER_SESSION.close();
         if (JAVA_DRIVER != null)
@@ -89,6 +108,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl
     @Before
     public void setup()
     {
+        logger.info("[test step : @Before] setup");
         CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
         init(CLUSTER);
         CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
@@ -102,12 +122,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl
     @Test
     public void noWarningsSinglePartition()
     {
+        logger.info("[test step : @Test] noWarningsSinglePartition");
         noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
     }
 
     @Test
     public void noWarningsScan()
     {
+        logger.info("[test step : @Test] noWarningsScan");
         noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
     }
 
@@ -134,12 +156,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl
     @Test
     public void warnThresholdSinglePartition()
     {
+        logger.info("[test step : @Test] warnThresholdSinglePartition");
         warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
     }
 
     @Test
     public void warnThresholdScan()
     {
+        logger.info("[test step : @Test] warnThresholdScan");
         warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
     }
 
@@ -183,12 +207,14 @@ public class TombstoneCountWarningTest extends TestBaseImpl
     @Test
     public void failThresholdSinglePartition() throws UnknownHostException
     {
+        logger.info("[test step : @Test] failThresholdSinglePartition");
         failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
     }
 
     @Test
     public void failThresholdScan() throws UnknownHostException
     {
+        logger.info("[test step : @Test] failThresholdScan");
         failThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
     }
 
@@ -248,7 +274,10 @@ public class TombstoneCountWarningTest extends TestBaseImpl
 
         assertWarnAborts(0, 2, 1);
 
+        // when disabled warnings only happen if on the coordinator, and coordinator may not be the one replying
+        // to every query
         enable(false);
+        State.blockFor(CLUSTER.get(1).config().broadcastAddress());
         warnings = CLUSTER.get(1).callsOnInstance(() -> {
             ClientWarn.instance.captureWarnings();
             try
@@ -259,7 +288,12 @@ public class TombstoneCountWarningTest extends TestBaseImpl
             catch (ReadFailureException e)
             {
                 Assertions.assertThat(e).isNotInstanceOf(TombstoneAbortException.class);
+                Assertions.assertThat(e.failureReasonByEndpoint).isNotEmpty();
+                Assertions.assertThat(e.failureReasonByEndpoint.values())
+                          .as("Non READ_TOO_MANY_TOMBSTONES exists")
+                          .allMatch(RequestFailureReason.READ_TOO_MANY_TOMBSTONES::equals);
             }
+            logger.warn("Checking warnings...");
             return ClientWarn.instance.getWarnings();
         }).call();
         // client warnings are currently coordinator only, so if present only 1 is expected
@@ -276,6 +310,7 @@ public class TombstoneCountWarningTest extends TestBaseImpl
 
         assertWarnAborts(0, 2, 0);
 
+        State.blockFor(CLUSTER.get(1).config().broadcastAddress());
         try
         {
             driverQueryAll(cql);
@@ -327,4 +362,82 @@ public class TombstoneCountWarningTest extends TestBaseImpl
     {
         return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
     }
+
+    @Shared
+    public static class State
+    {
+        // use InetSocketAddress as InetAddressAndPort is @Isolated which means equality doesn't work due to different
+        // ClassLoaders; InetSocketAddress is @Shared so safe to use between app and cluster class loaders
+        public static volatile InetSocketAddress blockFor = null;
+        public static volatile CompletableFuture<Void> promise = null;
+
+        // called on main thread
+        public static void blockFor(InetSocketAddress address)
+        {
+            blockFor = address;
+            promise = new CompletableFuture<>();
+        }
+
+        // called in C* threads; non-test threads
+        public static void onFailure(InetSocketAddress address)
+        {
+            if (address.equals(blockFor))
+                promise.complete(null);
+        }
+
+        // called on main thread
+        public static void syncAndClear()
+        {
+            if (blockFor != null)
+            {
+                promise.join();
+                blockFor = null;
+                promise = null;
+            }
+        }
+    }
+
+    public static class BB
+    {
+        private static void install(ClassLoader cl, int instanceId)
+        {
+            if (instanceId != 1)
+                return;
+            new ByteBuddy().rebase(ReadCallback.class)
+                           .method(named("awaitResults"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .method(named("onFailure"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+            new ByteBuddy().rebase(SEPExecutor.class)
+                           .method(named("maybeExecuteImmediately"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static void awaitResults(@SuperCall Runnable zuper)
+        {
+            State.syncAndClear();
+            zuper.run();
+        }
+
+        @SuppressWarnings("unused")
+        public static void onFailure(InetAddressAndPort from, RequestFailureReason failureReason, @SuperCall Runnable zuper) throws Exception
+        {
+            State.onFailure(new InetSocketAddress(from.getAddress(), from.getPort()));
+            zuper.run();
+        }
+
+        // make sure to schedule the task rather than running inline...
+        // this is imporant as the read may block on the local version which can get the test to include it rather than
+        // block waiting, so by scheduling we make sure its always fair
+        @SuppressWarnings("unused")
+        public static void maybeExecuteImmediately(Runnable task, @This SEPExecutor executor)
+        {
+            executor.execute(task);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org