You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/11/19 01:32:57 UTC

[cassandra] branch trunk updated (33fd2dc -> 10c6852)

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 33fd2dc  Log queries that fail on timeout or unavailable errors up to once per minute by default
     new 1bcfa08  DebuggableThreadPoolExecutor does not propagate client warnings
     new 10c6852  Merge branch 'cassandra-4.0' into trunk

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/concurrent/Stage.java     |   1 +
 .../statements/schema/AlterKeyspaceStatement.java  |   8 --
 .../statements/schema/CreateKeyspaceStatement.java |  13 +--
 .../org/apache/cassandra/service/ClientWarn.java   |   5 -
 .../DebuggableThreadPoolExecutorTest.java          |  97 +++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |  27 +++++
 .../schema/SchemaStatementWarningsTest.java        | 117 +++++++++++++++++++++
 8 files changed, 244 insertions(+), 25 deletions(-)
 create mode 100644 test/unit/org/apache/cassandra/schema/SchemaStatementWarningsTest.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-4.0' into trunk

Posted by dc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 10c685222fc415586ae28a01e7896063a3f2f0d3
Merge: 33fd2dc 1bcfa08
Author: David Capwell <dc...@apache.org>
AuthorDate: Thu Nov 18 17:29:23 2021 -0800

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/concurrent/Stage.java     |   1 +
 .../statements/schema/AlterKeyspaceStatement.java  |   8 --
 .../statements/schema/CreateKeyspaceStatement.java |  13 +--
 .../org/apache/cassandra/service/ClientWarn.java   |   5 -
 .../DebuggableThreadPoolExecutorTest.java          |  97 +++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |  27 +++++
 .../schema/SchemaStatementWarningsTest.java        | 117 +++++++++++++++++++++
 8 files changed, 244 insertions(+), 25 deletions(-)

diff --cc CHANGES.txt
index 676e92b,1d72c0f..316d14b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,65 -1,5 +1,66 @@@
 -4.0.2
 +4.1
 + * Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159)
 + * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069)
 + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
 + * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent (CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution (CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking (CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations (CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores (CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL (CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701)
 + * Add a system property to set hostId if not yet initialized (CASSANDRA-14582)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651)
 +Merged from 4.0:
+  * DebuggableThreadPoolExecutor does not propagate client warnings (CASSANDRA-17072)
   * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes have new names. Backward compatibility with the old names added (CASSANDRA-17141)
   * Remove unused configuration parameters from cassandra.yaml (CASSANDRA-17132)
   * Queries performed with NODE_LOCAL consistency level do not update request metrics (CASSANDRA-17052)
diff --cc src/java/org/apache/cassandra/concurrent/Stage.java
index 66cd7cb,e00da7b..d8a5e54
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@@ -158,47 -166,39 +158,48 @@@ public enum Stag
          ExecutorUtils.awaitTermination(timeout, units, executors);
      }
  
 -    static LocalAwareExecutorService tracingExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    private static ExecutorPlus tracingStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    {
 +        return executorFactory()
 +                .withJmx(jmxType)
 +                .configureSequential(jmxName)
 +                .withQueueLimit(1000)
 +                .withRejectedExecutionHandler((r, executor) -> MessagingService.instance().metrics.recordSelfDroppedMessage(Verb._TRACE)).build();
 +    }
 +
 +    private static ExecutorPlus migrationStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize)
      {
 -        RejectedExecutionHandler reh = (r, executor) -> MessagingService.instance().metrics.recordSelfDroppedMessage(Verb._TRACE);
 -        return new TracingExecutor(1,
 -                                   1,
 -                                   KEEP_ALIVE_SECONDS,
 -                                   TimeUnit.SECONDS,
 -                                   new ArrayBlockingQueue<>(1000),
 -                                   new NamedThreadFactory(jmxName),
 -                                   reh);
 +        return executorFactory()
++               .localAware()
 +               .withJmx(jmxType)
 +               .sequential(jmxName);
      }
  
 -    static LocalAwareExecutorService multiThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    private static LocalAwareExecutorPlus singleThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize)
      {
 -        return new JMXEnabledThreadPoolExecutor(numThreads,
 -                                                KEEP_ALIVE_SECONDS,
 -                                                TimeUnit.SECONDS,
 -                                                new LinkedBlockingQueue<>(),
 -                                                new NamedThreadFactory(jmxName),
 -                                                jmxType);
 +        return executorFactory()
 +                .localAware()
 +                .withJmx(jmxType)
 +                .sequential(jmxName);
      }
  
 -    static LocalAwareExecutorService multiThreadedLowSignalStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    static LocalAwareExecutorPlus multiThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize)
      {
 -        return SharedExecutorPool.SHARED.newExecutor(numThreads, onSetMaximumPoolSize, jmxType, jmxName);
 +        return executorFactory()
 +                .localAware()
 +                .withJmx(jmxType)
 +                .pooled(jmxName, numThreads);
      }
  
 -    static LocalAwareExecutorService singleThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    static LocalAwareExecutorPlus multiThreadedLowSignalStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize)
      {
 -        return new JMXEnabledSingleThreadExecutor(jmxName, jmxType);
 +        return executorFactory()
 +                .localAware()
 +                .withJmx(jmxType)
 +                .shared(jmxName, numThreads, onSetMaximumPoolSize);
      }
  
 -    static LocalAwareExecutorService immediateExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    static LocalAwareExecutorPlus immediateExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize)
      {
          return ImmediateExecutor.INSTANCE;
      }
diff --cc src/java/org/apache/cassandra/service/ClientWarn.java
index 46a42c7,5a6a878..6e2d3fc
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@@ -64,14 -64,9 +64,9 @@@ public class ClientWarn extends Executo
          return state.warnings;
      }
  
-     public int numWarnings()
-     {
-         return getWarnings() == null ? 0 : getWarnings().size();
-     }
- 
      public void resetWarnings()
      {
 -        warnLocal.remove();
 +        set(null);
      }
  
      public static class State
diff --cc test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
index 17253ec,43c0fdf..adb0962
--- a/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
@@@ -29,6 -32,8 +29,7 @@@ import java.util.function.Supplier
  
  import com.google.common.base.Throwables;
  import com.google.common.net.InetAddresses;
 -import com.google.common.util.concurrent.ListenableFutureTask;
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Assert;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -38,11 -44,12 +40,16 @@@ import org.apache.cassandra.service.Cli
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.TraceStateImpl;
  import org.apache.cassandra.tracing.Tracing;
+ import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.WrappedRunnable;
+ import org.assertj.core.api.Assertions;
  
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 +
+ import static java.util.concurrent.TimeUnit.MILLISECONDS;
++import static org.assertj.core.api.Assertions.assertThat;
+ 
  public class DebuggableThreadPoolExecutorTest
  {
      @BeforeClass
@@@ -75,14 -87,82 +82,104 @@@
      }
  
      @Test
+     public void testLocalStatePropagation()
+     {
 -        DebuggableThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("TEST", 1);
++        ExecutorPlus executor = executorFactory().localAware().sequential("TEST");
++        assertThat(executor).isInstanceOf(LocalAwareExecutorPlus.class);
++        try
++        {
++            checkLocalStateIsPropagated(executor);
++        }
++        finally
++        {
++            executor.shutdown();
++        }
++    }
++
++    @Test
++    public void testNoLocalStatePropagation() throws InterruptedException
++    {
++        ExecutorPlus executor = executorFactory().sequential("TEST");
++        assertThat(executor).isNotInstanceOf(LocalAwareExecutorPlus.class);
+         try
+         {
+             checkLocalStateIsPropagated(executor);
+         }
+         finally
+         {
+             executor.shutdown();
+         }
+     }
+ 
 -    public static void checkLocalStateIsPropagated(LocalAwareExecutorService executor)
++    public static void checkLocalStateIsPropagated(ExecutorPlus executor)
+     {
+         checkClientWarningsArePropagated(executor, () -> executor.execute(() -> ClientWarn.instance.warn("msg")));
+         checkClientWarningsArePropagated(executor, () -> executor.submit(() -> ClientWarn.instance.warn("msg")));
+         checkClientWarningsArePropagated(executor, () -> executor.submit(() -> ClientWarn.instance.warn("msg"), null));
+         checkClientWarningsArePropagated(executor, () -> executor.submit((Callable<Void>) () -> {
+             ClientWarn.instance.warn("msg");
+             return null;
+         }));
+ 
+         checkTracingIsPropagated(executor, () -> executor.execute(() -> Tracing.trace("msg")));
+         checkTracingIsPropagated(executor, () -> executor.submit(() -> Tracing.trace("msg")));
+         checkTracingIsPropagated(executor, () -> executor.submit(() -> Tracing.trace("msg"), null));
+         checkTracingIsPropagated(executor, () -> executor.submit((Callable<Void>) () -> {
+             Tracing.trace("msg");
+             return null;
+         }));
+     }
+ 
 -    public static void checkClientWarningsArePropagated(LocalAwareExecutorService executor, Runnable schedulingTask) {
++    public static void checkClientWarningsArePropagated(ExecutorPlus executor, Runnable schedulingTask) {
+         ClientWarn.instance.captureWarnings();
 -        Assertions.assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
++        assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
+ 
+         ClientWarn.instance.warn("msg0");
+         long initCompletedTasks = executor.getCompletedTaskCount();
+         schedulingTask.run();
+         while (executor.getCompletedTaskCount() == initCompletedTasks) Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS);
+         ClientWarn.instance.warn("msg1");
+ 
 -        Assertions.assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", "msg", "msg1");
++        if (executor instanceof LocalAwareExecutorPlus)
++            assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", "msg", "msg1");
++        else
++            assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", "msg1");
+     }
+ 
 -    public static void checkTracingIsPropagated(LocalAwareExecutorService executor, Runnable schedulingTask) {
++    public static void checkTracingIsPropagated(ExecutorPlus executor, Runnable schedulingTask) {
+         ClientWarn.instance.captureWarnings();
 -        Assertions.assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
++        assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
+ 
+         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
+         Tracing.instance.set(new TraceState(FBUtilities.getLocalAddressAndPort(), UUID.randomUUID(), Tracing.TraceType.NONE)
+         {
+             @Override
+             protected void traceImpl(String message)
+             {
+                 q.add(message);
+             }
+         });
+         Tracing.trace("msg0");
+         long initCompletedTasks = executor.getCompletedTaskCount();
+         schedulingTask.run();
+         while (executor.getCompletedTaskCount() == initCompletedTasks) Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS);
+         Tracing.trace("msg1");
+ 
 -        Assertions.assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg", "msg1");
++        if (executor instanceof LocalAwareExecutorPlus)
++            assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg", "msg1");
++        else
++            assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg1");
+     }
+ 
+     @Test
      public void testExecuteFutureTaskWhileTracing()
      {
 -        LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
 -        DebuggableThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(1,
 -                                                                                 Integer.MAX_VALUE,
 -                                                                                 TimeUnit.MILLISECONDS,
 -                                                                                 q,
 -                                                                                 new NamedThreadFactory("TEST"));
 +        SettableUncaughtExceptionHandler ueh = new SettableUncaughtExceptionHandler();
 +        ExecutorPlus executor = executorFactory()
 +                                .localAware()
 +                                .configureSequential("TEST")
 +                                .withUncaughtExceptionHandler(ueh)
 +                                .withQueueLimit(1).build();
          Runnable test = () -> executor.execute(failingTask());
          try
          {
diff --cc test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
index 2a8aeb9,97e389c..7a682ed
--- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@@ -36,6 -38,7 +38,8 @@@ import org.apache.cassandra.utils.FBUti
  
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.MINUTES;
+ import static org.apache.cassandra.concurrent.DebuggableThreadPoolExecutorTest.checkLocalStateIsPropagated;
++import static org.assertj.core.api.Assertions.assertThat;
  
  public class SEPExecutorTest
  {
@@@ -260,4 -270,20 +270,21 @@@
          // Will return true if all of the LatchWaiters count down before the timeout
          Assert.assertTrue("Test tasks did not hit max concurrency goal", concurrencyGoal.await(3L, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testLocalStatePropagation() throws InterruptedException, TimeoutException
+     {
+         SharedExecutorPool sharedPool = new SharedExecutorPool("TestPool");
+         try
+         {
 -            LocalAwareExecutorService executor = sharedPool.newExecutor(1, "TEST", "TEST");
++            LocalAwareExecutorPlus executor = sharedPool.newExecutor(1, "TEST", "TEST");
++            assertThat(executor).isInstanceOf(LocalAwareExecutorPlus.class);
+             checkLocalStateIsPropagated(executor);
+         }
+         finally
+         {
+             sharedPool.shutdownAndWait(1, TimeUnit.SECONDS);
+         }
+     }
+ 
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org