You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/06/09 12:42:38 UTC

[cassandra] branch trunk updated: Avoid blocking AntiEntropyStage when submitting validation requests

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04533e6  Avoid blocking AntiEntropyStage when submitting validation requests
04533e6 is described below

commit 04533e6cdae94f91a62d769c874156d81301dc7d
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Wed May 13 19:20:41 2020 +0100

    Avoid blocking AntiEntropyStage when submitting validation requests
    
    Patch by Sam Tunnicliffe; reviewed by Benjamin Lerer for CASSANDRA-15812
    
    Switches ValidationExecutor's work queue to LinkedBlockingQueue to
    avoid blocking AntiEntropyStage when the executor is saturated. This
    requires VE.corePoolSize to be set to concurrent_validations as now
    it will always prefer to queue requests rather than start new threads.
    
    This commit also adds a hard limit on concurrent_validations, as allowing
    an unbounded number of validations to run concurrently is never safe.
    This was always true, but setting a high value here is now more
    dangerous as it controls the number of core, not max, threads.
    This hard limit is linked to concurrent_compactors, so operators may
    set concurrent_validations between 1 and concurrent_compactors.
    The meaning of setting it < 1 has changed from "unbounded" to
    "whatever concurrent_compactors is set to".
    
    This safety valve can be overridden with a system property at startup
    and/or a JMX property.
    
    CASSANDRA-9292 removed the 1hr timeout on prepare messages, but this
    was inadvertently undone when CASSANDRA-13397 was committed. As nothing
    long running is done in the repair phase anymore, this timeout can
    safely be reduced.
    
    If using RepairCommandPoolFullStrategy.queue, the core pool size
    for repairCommandExecutor must be increased from the default
    value of 1 or else all concurrent tasks will be queued and no
    more threads created.
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   9 +-
 src/java/org/apache/cassandra/config/Config.java   |   2 +-
 .../cassandra/config/DatabaseDescriptor.java       |  29 ++++-
 .../cassandra/db/compaction/CompactionManager.java |  32 +++++-
 .../cassandra/service/ActiveRepairService.java     |  56 ++++++---
 .../apache/cassandra/service/StorageService.java   |  37 +++++-
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../cassandra/distributed/impl/Instance.java       |   2 +-
 .../distributed/test/RepairCoordinatorTimeout.java |   6 +-
 .../cassandra/config/DatabaseDescriptorTest.java   |  49 +++++++-
 .../db/compaction/ValidationExecutorTest.java      | 125 +++++++++++++++++++++
 .../cassandra/service/ActiveRepairServiceTest.java | 106 +++++++++++++++++
 13 files changed, 427 insertions(+), 31 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 81efa76..69e58a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Prevent validation request submission from blocking ANTI_ENTROPY stage (CASSANDRA-15812)
  * Add fqltool and auditlogviewer to rpm and deb packages (CASSANDRA-14712)
  * Include DROPPED_COLUMNS in schema digest computation (CASSANDRA-15843)
  * Fix Cassandra restart from rpm install (CASSANDRA-15830)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 3a44a09..79f6434 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -814,8 +814,13 @@ column_index_cache_size_in_kb: 2
 # to the number of cores.
 #concurrent_compactors: 1
 
-# Number of simultaneous repair validations to allow. Default is unbounded
-# Values less than one are interpreted as unbounded (the default)
+# Number of simultaneous repair validations to allow. If not set or set to
+# a value less than 1, it defaults to the value of concurrent_compactors.
+# To set a value greeater than concurrent_compactors at startup, the system
+# property cassandra.allow_unlimited_concurrent_validations must be set to
+# true. To dynamically resize to a value > concurrent_compactors on a running
+# node, first call the bypassConcurrentValidatorsLimit method on the
+# org.apache.cassandra.db:type=StorageService mbean
 # concurrent_validations: 0
 
 # Number of simultaneous materialized view builder tasks to allow.
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 6d42656..34ac049 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -216,7 +216,6 @@ public class Config
     public volatile int compaction_large_partition_warning_threshold_mb = 100;
     public int min_free_space_per_drive_in_mb = 50;
 
-    public volatile int concurrent_validations = Integer.MAX_VALUE;
     public volatile int concurrent_materialized_view_builders = 1;
 
     /**
@@ -416,6 +415,7 @@ public class Config
     public volatile boolean back_pressure_enabled = false;
     public volatile ParameterizedClass back_pressure_strategy;
 
+    public volatile int concurrent_validations;
     public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
     public int repair_command_pool_size = concurrent_validations;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5c753e0..a36ac1e 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -152,6 +152,8 @@ public class DatabaseDescriptor
     // turns some warnings into exceptions for testing
     private static final boolean strictRuntimeChecks = Boolean.getBoolean("cassandra.strict.runtime.checks");
 
+    public static volatile boolean allowUnlimitedConcurrentValidations = Boolean.getBoolean("cassandra.allow_unlimited_concurrent_validations");
+
     private static Function<CommitLog, AbstractCommitLogSegmentManager> commitLogSegmentMgrProvider = c -> DatabaseDescriptor.isCDCEnabled()
                                        ? new CommitLogSegmentManagerCDC(c, DatabaseDescriptor.getCommitLogLocation())
                                        : new CommitLogSegmentManagerStandard(c, DatabaseDescriptor.getCommitLogLocation());
@@ -668,12 +670,12 @@ public class DatabaseDescriptor
         if (conf.concurrent_compactors == null)
             conf.concurrent_compactors = Math.min(8, Math.max(2, Math.min(FBUtilities.getAvailableProcessors(), conf.data_file_directories.length)));
 
-        if (conf.concurrent_validations < 1)
-            conf.concurrent_validations = Integer.MAX_VALUE;
-
         if (conf.concurrent_compactors <= 0)
             throw new ConfigurationException("concurrent_compactors should be strictly greater than 0, but was " + conf.concurrent_compactors, false);
 
+        applyConcurrentValidations(conf);
+        applyRepairCommandPoolSize(conf);
+
         if (conf.concurrent_materialized_view_builders <= 0)
             throw new ConfigurationException("concurrent_materialized_view_builders should be strictly greater than 0, but was " + conf.concurrent_materialized_view_builders, false);
 
@@ -841,6 +843,27 @@ public class DatabaseDescriptor
         validateMaxConcurrentAutoUpgradeTasksConf(conf.max_concurrent_automatic_sstable_upgrades);
     }
 
+    @VisibleForTesting
+    static void applyConcurrentValidations(Config config)
+    {
+        if (config.concurrent_validations < 1)
+        {
+            config.concurrent_validations = config.concurrent_compactors;
+        }
+        else if (config.concurrent_validations > config.concurrent_compactors && !allowUnlimitedConcurrentValidations)
+        {
+            throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
+                                             "set the system property cassandra.allow_unlimited_concurrent_validations=true");
+        }
+    }
+
+    @VisibleForTesting
+    static void applyRepairCommandPoolSize(Config config)
+    {
+        if (config.repair_command_pool_size < 1)
+            config.repair_command_pool_size = config.concurrent_validations;
+    }
+
     private static String storagedirFor(String type)
     {
         return storagedir(type + "_directory") + File.separator + type;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2b5ffe6..7df56fc 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -124,7 +124,7 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     private final CompactionExecutor executor = new CompactionExecutor();
-    private final CompactionExecutor validationExecutor = new ValidationExecutor();
+    private final ValidationExecutor validationExecutor = new ValidationExecutor();
     private final CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
     private final CompactionExecutor viewBuildExecutor = new ViewBuildExecutor();
 
@@ -1897,9 +1897,32 @@ public class CompactionManager implements CompactionManagerMBean
     // TODO: pull out relevant parts of CompactionExecutor and move to ValidationManager
     public static class ValidationExecutor extends CompactionExecutor
     {
+        // CompactionExecutor, and by extension ValidationExecutor, use DebuggableThreadPoolExecutor's
+        // default RejectedExecutionHandler which blocks the submitting thread when the work queue is
+        // full. The calling thread in this case is AntiEntropyStage, so in most cases we don't actually
+        // want to block when the ValidationExecutor is saturated as this prevents progress on all
+        // repair tasks and may cause repair sessions to time out. Also, it can lead to references to
+        // heavyweight validation responses containing merkle trees being held for extended periods which
+        // increases GC pressure. Using LinkedBlockingQueue instead of the default SynchronousQueue allows
+        // tasks to be submitted without blocking the caller, but will always prefer queueing to creating
+        // new threads if the pool already has at least `corePoolSize` threads already running. For this
+        // reason we set corePoolSize to the maximum desired concurrency, but allow idle core threads to
+        // be terminated.
+
         public ValidationExecutor()
         {
-            super(1, DatabaseDescriptor.getConcurrentValidations(), "ValidationExecutor", new SynchronousQueue<Runnable>());
+            super(DatabaseDescriptor.getConcurrentValidations(),
+                  DatabaseDescriptor.getConcurrentValidations(),
+                  "ValidationExecutor",
+                  new LinkedBlockingQueue());
+
+            allowCoreThreadTimeOut(true);
+        }
+
+        public void adjustPoolSize()
+        {
+            setMaximumPoolSize(DatabaseDescriptor.getConcurrentValidations());
+            setCorePoolSize(DatabaseDescriptor.getConcurrentValidations());
         }
     }
 
@@ -2021,10 +2044,9 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public void setConcurrentValidations(int value)
+    public void setConcurrentValidations()
     {
-        value = value > 0 ? value : Integer.MAX_VALUE;
-        validationExecutor.setMaximumPoolSize(value);
+        validationExecutor.adjustPoolSize();
     }
 
     public void setConcurrentViewBuilders(int value)
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index c679987..d386d37 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -126,24 +127,55 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
     private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions = new ConcurrentHashMap<>();
 
-    public final static ExecutorService repairCommandExecutor;
     static
     {
-        Config.RepairCommandPoolFullStrategy strategy = DatabaseDescriptor.getRepairCommandPoolFullStrategy();
+        RepairMetrics.init();
+    }
+
+    public static class RepairCommandExecutorHandle
+    {
+        private static final ThreadPoolExecutor repairCommandExecutor =
+            initializeExecutor(DatabaseDescriptor.getRepairCommandPoolSize(),
+                               DatabaseDescriptor.getRepairCommandPoolFullStrategy());
+    }
+
+    @VisibleForTesting
+    static ThreadPoolExecutor initializeExecutor(int maxPoolSize, Config.RepairCommandPoolFullStrategy strategy)
+    {
+        int corePoolSize = 1;
         BlockingQueue<Runnable> queue;
         if (strategy == Config.RepairCommandPoolFullStrategy.reject)
+        {
+            // new threads will be created on demand up to max pool
+            // size so we can leave corePoolSize at 1 to start with
             queue = new SynchronousQueue<>();
+        }
         else
+        {
+            // new threads are only created if > corePoolSize threads are running
+            // and the queue is full, so set corePoolSize to the desired max as the
+            // queue will _never_ be full. Idle core threads will eventually time
+            // out and may be re-created if/when subsequent tasks are submitted.
+            corePoolSize = maxPoolSize;
             queue = new LinkedBlockingQueue<>();
+        }
 
-        repairCommandExecutor = new JMXEnabledThreadPoolExecutor(1,
-                                                                 DatabaseDescriptor.getRepairCommandPoolSize(),
-                                                                 1, TimeUnit.HOURS,
-                                                                 queue,
-                                                                 new NamedThreadFactory("Repair-Task"),
-                                                                 "internal",
-                                                                 new ThreadPoolExecutor.AbortPolicy());
-        RepairMetrics.init();
+        ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(corePoolSize,
+                                                                       maxPoolSize,
+                                                                       1,
+                                                                       TimeUnit.HOURS,
+                                                                       queue,
+                                                                       new NamedThreadFactory("Repair-Task"),
+                                                                       "internal",
+                                                                       new ThreadPoolExecutor.AbortPolicy());
+        // allow idle core threads to be terminated
+        executor.allowCoreThreadTimeOut(true);
+        return executor;
+    }
+
+    public static ThreadPoolExecutor repairCommandExecutor()
+    {
+        return RepairCommandExecutorHandle.repairCommandExecutor;
     }
 
     private final IFailureDetector failureDetector;
@@ -461,10 +493,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         }
         try
         {
-            // Failed repair is expensive so we wait for longer time.
-            if (!prepareLatch.await(1, TimeUnit.HOURS)) {
+            if (!prepareLatch.await(DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS))
                 failRepair(parentRepairSession, "Did not get replies from all endpoints.");
-            }
         }
         catch (InterruptedException e)
         {
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0e42904..428dac1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1440,6 +1440,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         CompactionManager.instance.setConcurrentCompactors(value);
     }
 
+    public void bypassConcurrentValidatorsLimit()
+    {
+        logger.info("Enabling the ability to set concurrent validations to an unlimited value");
+        DatabaseDescriptor.allowUnlimitedConcurrentValidations = true ;
+    }
+
+    public void enforceConcurrentValidatorsLimit()
+    {
+        logger.info("Disabling the ability to set concurrent validations to an unlimited value");
+        DatabaseDescriptor.allowUnlimitedConcurrentValidations = false ;
+    }
+
+    public boolean isConcurrentValidatorsLimitEnforced()
+    {
+        return DatabaseDescriptor.allowUnlimitedConcurrentValidations;
+    }
+
     public int getConcurrentValidators()
     {
         return DatabaseDescriptor.getConcurrentValidations();
@@ -1447,8 +1464,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void setConcurrentValidators(int value)
     {
+        int concurrentCompactors = DatabaseDescriptor.getConcurrentCompactors();
+        if (value > concurrentCompactors && !DatabaseDescriptor.allowUnlimitedConcurrentValidations)
+            throw new IllegalArgumentException(
+            String.format("Cannot set concurrent_validations greater than concurrent_compactors (%d)",
+                          concurrentCompactors));
+
+        if (value <= 0)
+        {
+            logger.info("Using default value of concurrent_compactors ({}) for concurrent_validations", concurrentCompactors);
+            value = concurrentCompactors;
+        }
+        else
+        {
+            logger.info("Setting concurrent_validations to {}", value);
+        }
+
         DatabaseDescriptor.setConcurrentValidations(value);
-        CompactionManager.instance.setConcurrentValidations(DatabaseDescriptor.getConcurrentValidations());
+        CompactionManager.instance.setConcurrentValidations();
     }
 
     public int getConcurrentViewBuilders()
@@ -3733,7 +3766,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return Pair.create(0, Futures.immediateFuture(null));
 
         int cmd = nextRepairCommand.incrementAndGet();
-        return Pair.create(cmd, ActiveRepairService.repairCommandExecutor.submit(createRepairTask(cmd, keyspace, option, listeners)));
+        return Pair.create(cmd, ActiveRepairService.repairCommandExecutor().submit(createRepairTask(cmd, keyspace, option, listeners)));
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f8488c7..9664769 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -581,6 +581,10 @@ public interface StorageServiceMBean extends NotificationEmitter
     public int getConcurrentCompactors();
     public void setConcurrentCompactors(int value);
 
+    public void bypassConcurrentValidatorsLimit();
+    public void enforceConcurrentValidatorsLimit();
+    public boolean isConcurrentValidatorsLimitEnforced();
+
     public int getConcurrentValidators();
     public void setConcurrentValidators(int value);
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 908ccb2..c3e9982 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -553,7 +553,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
                                 () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
                                 () -> SSTableReader.shutdownBlocking(1L, MINUTES),
-                                () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor)),
+                                () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
                                 () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
             );
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
index 7a08187..f523396 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
@@ -42,19 +42,19 @@ public abstract class RepairCoordinatorTimeout extends RepairCoordinatorBase
             NodeToolResult result = repair(1, KEYSPACE, table);
             result.asserts()
                   .failure()
-                  .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
+                  .errorContains("Did not get replies from all endpoints.");
             if (withNotifications)
             {
                 result.asserts()
                       .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
                       .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
+                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Did not get replies from all endpoints.")
                       .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
             }
 
             if (repairType != RepairType.PREVIEW)
             {
-                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Did not get replies from all endpoints.");
             }
             else
             {
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 4a6a452..2992a60 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -26,9 +26,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
 
+
 import com.google.common.base.Throwables;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,6 +37,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -455,4 +456,50 @@ public class DatabaseDescriptorTest
         assertEquals(100, // total size is more than preferred so keep the configured limit
                      DatabaseDescriptor.calculateDefaultSpaceInMB("type", "/path", "setting_name", preferredInMB, spaceInBytes, numerator, denominator));
     }
+
+    @Test
+    public void testConcurrentValidations()
+    {
+        Config conf = new Config();
+        conf.concurrent_compactors = 8;
+        // if concurrent_validations is < 1 (including being unset) it should default to concurrent_compactors
+        assertThat(conf.concurrent_validations).isLessThan(1);
+        DatabaseDescriptor.applyConcurrentValidations(conf);
+        assertThat(conf.concurrent_validations).isEqualTo(conf.concurrent_compactors);
+
+        // otherwise, it must be <= concurrent_compactors
+        conf.concurrent_validations = conf.concurrent_compactors + 1;
+        try
+        {
+            DatabaseDescriptor.applyConcurrentValidations(conf);
+            fail("Expected exception");
+        }
+        catch (ConfigurationException e)
+        {
+            assertThat(e.getMessage()).isEqualTo("To set concurrent_validations > concurrent_compactors, " +
+                                                 "set the system property cassandra.allow_unlimited_concurrent_validations=true");
+        }
+
+        // unless we disable that check (done with a system property at startup or via JMX)
+        DatabaseDescriptor.allowUnlimitedConcurrentValidations = true;
+        conf.concurrent_validations = conf.concurrent_compactors + 1;
+        DatabaseDescriptor.applyConcurrentValidations(conf);
+        assertThat(conf.concurrent_validations).isEqualTo(conf.concurrent_compactors + 1);
+    }
+
+    @Test
+    public void testRepairCommandPoolSize()
+    {
+        Config conf = new Config();
+        conf.concurrent_validations = 3;
+        // if repair_command_pool_size is < 1 (including being unset) it should default to concurrent_validations
+        assertThat(conf.repair_command_pool_size).isLessThan(1);
+        DatabaseDescriptor.applyRepairCommandPoolSize(conf);
+        assertThat(conf.repair_command_pool_size).isEqualTo(conf.concurrent_validations);
+
+        // but it can be overridden
+        conf.repair_command_pool_size = conf.concurrent_validations + 1;
+        DatabaseDescriptor.applyRepairCommandPoolSize(conf);
+        assertThat(conf.repair_command_pool_size).isEqualTo(conf.concurrent_validations + 1);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/compaction/ValidationExecutorTest.java b/test/unit/org/apache/cassandra/db/compaction/ValidationExecutorTest.java
new file mode 100644
index 0000000..a175bdd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/ValidationExecutorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class ValidationExecutorTest
+{
+
+    CompactionManager.ValidationExecutor validationExecutor;
+
+    @Before
+    public void setup()
+    {
+        DatabaseDescriptor.clientInitialization();
+        // required for static initialization of CompactionManager
+        DatabaseDescriptor.setConcurrentCompactors(2);
+        DatabaseDescriptor.setConcurrentValidations(2);
+
+        // shutdown the singleton CompactionManager to ensure MBeans are unregistered
+        CompactionManager.instance.forceShutdown();
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (null != validationExecutor)
+            validationExecutor.shutdownNow();
+    }
+
+    @Test
+    public void testQueueOnValidationSubmission() throws InterruptedException
+    {
+        Condition taskBlocked = new SimpleCondition();
+        AtomicInteger threadsAvailable = new AtomicInteger(DatabaseDescriptor.getConcurrentValidations());
+        CountDownLatch taskComplete = new CountDownLatch(5);
+        validationExecutor = new CompactionManager.ValidationExecutor();
+
+        ExecutorService testExecutor = Executors.newSingleThreadExecutor();
+        for (int i=0; i< 5; i++)
+            testExecutor.submit(() -> {
+                threadsAvailable.decrementAndGet();
+                validationExecutor.submit(new Task(taskBlocked, taskComplete));
+            });
+
+        // wait for all tasks to be submitted & check that the excess ones were queued
+        while (threadsAvailable.get() > 0)
+            TimeUnit.MILLISECONDS.sleep(10);
+
+        assertEquals(2, validationExecutor.getActiveTaskCount());
+        assertEquals(3, validationExecutor.getPendingTaskCount());
+
+        taskBlocked.signalAll();
+        taskComplete.await(10, TimeUnit.SECONDS);
+        validationExecutor.shutdownNow();
+    }
+
+    @Test
+    public void testAdjustPoolSize()
+    {
+        // adjusting the pool size should dynamically set core and max pool
+        // size to DatabaseDescriptor::getConcurrentValidations
+
+        validationExecutor = new CompactionManager.ValidationExecutor();
+
+        int corePoolSize = validationExecutor.getCorePoolSize();
+        int maxPoolSize = validationExecutor.getMaximumPoolSize();
+
+        DatabaseDescriptor.setConcurrentValidations(corePoolSize * 2);
+        validationExecutor.adjustPoolSize();
+        assertThat(validationExecutor.getCorePoolSize()).isEqualTo(corePoolSize * 2);
+        assertThat(validationExecutor.getMaximumPoolSize()).isEqualTo(maxPoolSize * 2);
+        validationExecutor.shutdownNow();
+    }
+
+    private static class Task implements Runnable
+    {
+        private final Condition blocked;
+        private final CountDownLatch complete;
+
+        Task(Condition blocked, CountDownLatch complete)
+        {
+            this.blocked = blocked;
+            this.complete = complete;
+        }
+
+        public void run()
+        {
+            Uninterruptibles.awaitUninterruptibly(blocked, 10, TimeUnit.SECONDS);
+            complete.countDown();
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 4f7cde0..8883f21 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -19,15 +19,26 @@
 package org.apache.cassandra.service;
 
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -47,6 +58,7 @@ import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.apache.cassandra.repair.messages.RepairOption.DATACENTERS_KEY;
 import static org.apache.cassandra.repair.messages.RepairOption.FORCE_REPAIR_KEY;
@@ -352,4 +364,98 @@ public class ActiveRepairServiceTest
         // full repair
         Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false)), false));
     }
+
+    @Test
+    public void testRejectWhenPoolFullStrategy() throws InterruptedException
+    {
+        // Using RepairCommandPoolFullStrategy.reject, new threads are spawned up to
+        // repair_command_pool_size, at which point futher submissions are rejected
+        ExecutorService validationExecutor = ActiveRepairService.initializeExecutor(2, Config.RepairCommandPoolFullStrategy.reject);
+        try
+        {
+            Condition blocked = new SimpleCondition();
+            CountDownLatch completed = new CountDownLatch(2);
+            validationExecutor.submit(new Task(blocked, completed));
+            validationExecutor.submit(new Task(blocked, completed));
+            try
+            {
+                validationExecutor.submit(new Task(blocked, completed));
+                Assert.fail("Expected task submission to be rejected");
+            }
+            catch (RejectedExecutionException e)
+            {
+                // expected
+            }
+            // allow executing tests to complete
+            blocked.signalAll();
+            completed.await(10, TimeUnit.SECONDS);
+            // Submission is unblocked
+            validationExecutor.submit(() -> {});
+        }
+        finally
+        {
+            // necessary to unregister mbean
+            validationExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testQueueWhenPoolFullStrategy() throws InterruptedException
+    {
+        // Using RepairCommandPoolFullStrategy.queue, the pool is initialized to
+        // repair_command_pool_size and any tasks which cannot immediately be
+        // serviced are queued
+        ExecutorService validationExecutor = ActiveRepairService.initializeExecutor(2, Config.RepairCommandPoolFullStrategy.queue);
+        try
+        {
+            Condition allSubmitted = new SimpleCondition();
+            Condition blocked = new SimpleCondition();
+            CountDownLatch completed = new CountDownLatch(5);
+            ExecutorService testExecutor = Executors.newSingleThreadExecutor();
+            for (int i = 0; i < 5; i++)
+            {
+                if (i < 4)
+                    testExecutor.submit(() -> validationExecutor.submit(new Task(blocked, completed)));
+                else
+                    testExecutor.submit(() -> {
+                        validationExecutor.submit(new Task(blocked, completed));
+                        allSubmitted.signalAll();
+                    });
+            }
+
+            // Make sure all tasks have been submitted to the validation executor
+            allSubmitted.await(10, TimeUnit.SECONDS);
+
+            // 2 threads actively processing tasks
+            Assert.assertEquals(2, ((DebuggableThreadPoolExecutor) validationExecutor).getActiveTaskCount());
+            // 3 tasks queued
+            Assert.assertEquals(3, ((DebuggableThreadPoolExecutor) validationExecutor).getPendingTaskCount());
+            // allow executing tests to complete
+            blocked.signalAll();
+            completed.await(10, TimeUnit.SECONDS);
+        }
+        finally
+        {
+            // necessary to unregister mbean
+            validationExecutor.shutdownNow();
+        }
+    }
+
+    private static class Task implements Runnable
+    {
+        private final Condition blocked;
+        private final CountDownLatch complete;
+
+        Task(Condition blocked, CountDownLatch complete)
+        {
+            this.blocked = blocked;
+            this.complete = complete;
+        }
+
+        public void run()
+        {
+            Uninterruptibles.awaitUninterruptibly(blocked, 10, TimeUnit.SECONDS);
+            complete.countDown();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org