You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/11/02 12:35:29 UTC

[cassandra] branch CASSANDRA-16186 created (now 5b7a828)

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a change to branch CASSANDRA-16186
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


      at 5b7a828  CIRCLE-CI do not merge

This branch includes the following new commits:

     new 92eb92a  Fix the blocking behavior of SEPExecutor
     new 5b7a828  CIRCLE-CI do not merge

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Fix the blocking behavior of SEPExecutor

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch CASSANDRA-16186
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 92eb92afcd46ec982465e658396d01698572e0b5
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Mon Oct 12 12:59:03 2020 +0200

    Fix the blocking behavior of SEPExecutor
    
    If the number of tasks added to a SEPExecutor exceed the max queue size.
    The threads adding those task will be block until enough space become
    available for all the threads. At this point all the blocked threads will released at once.
    This behavior is no the correct one as a blocked thread should be released
    as soon as one space become available.
    
    The number of tasks reported as pending was also wrong as it included the
    blocked threads.
---
 .../apache/cassandra/concurrent/SEPExecutor.java   |  38 +--
 .../cassandra/concurrent/SharedExecutorPool.java   |   8 +-
 .../org/apache/cassandra/concurrent/Stage.java     |   2 +-
 .../org/apache/cassandra/transport/Message.java    |   1 -
 .../concurrent/LongSharedExecutorPoolTest.java     |   8 +-
 .../cassandra/concurrent/SEPExecutorTest.java      |   4 +-
 .../cassandra/metrics/ThreadPoolMetricsTest.java   | 289 +++++++++++++++++++++
 7 files changed, 303 insertions(+), 47 deletions(-)

diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
index ee939a2..5981123 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.metrics.ThreadPoolMetrics;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 import static org.apache.cassandra.concurrent.SEPWorker.Work;
 
@@ -43,16 +42,13 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService implements SE
     private final MaximumPoolSizeListener maximumPoolSizeListener;
     public final String name;
     private final String mbeanName;
-    public final int maxTasksQueued;
-    private final ThreadPoolMetrics metrics;
+    public final ThreadPoolMetrics metrics;
 
     // stores both a set of work permits and task permits:
     //  bottom 32 bits are number of queued tasks, in the range [0..maxTasksQueued]   (initially 0)
     //  top 32 bits are number of work permits available in the range [-resizeDelta..maximumPoolSize]   (initially maximumPoolSize)
     private final AtomicLong permits = new AtomicLong();
 
-    // producers wait on this when there is no room on the queue
-    private final WaitQueue hasRoom = new WaitQueue();
     private final AtomicLong completedTasks = new AtomicLong();
 
     volatile boolean shuttingDown = false;
@@ -61,14 +57,13 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService implements SE
     // TODO: see if other queue implementations might improve throughput
     protected final ConcurrentLinkedQueue<FutureTask<?>> tasks = new ConcurrentLinkedQueue<>();
 
-    SEPExecutor(SharedExecutorPool pool, int maximumPoolSize, MaximumPoolSizeListener maximumPoolSizeListener, int maxTasksQueued, String jmxPath, String name)
+    SEPExecutor(SharedExecutorPool pool, int maximumPoolSize, MaximumPoolSizeListener maximumPoolSizeListener, String jmxPath, String name)
     {
         this.pool = pool;
         this.name = name;
         this.mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + name;
         this.maximumPoolSize = new AtomicInteger(maximumPoolSize);
         this.maximumPoolSizeListener = maximumPoolSizeListener;
-        this.maxTasksQueued = maxTasksQueued;
         this.permits.set(combine(0, maximumPoolSize));
         this.metrics = new ThreadPoolMetrics(this, jmxPath, name).register();
         MBeanWrapper.instance.registerMBean(this, mbeanName);
@@ -82,7 +77,7 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService implements SE
     @Override
     public int getMaxTasksQueued()
     {
-        return maxTasksQueued;
+        return Integer.MAX_VALUE;
     }
 
     // schedules another worker for this pool if there is work outstanding and there are no spinning threads that
@@ -121,29 +116,6 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService implements SE
             // worker, we simply start a worker in a spinning state
             pool.maybeStartSpinningWorker();
         }
-        else if (taskPermits >= maxTasksQueued)
-        {
-            // register to receive a signal once a task is processed bringing the queue below its threshold
-            WaitQueue.Signal s = hasRoom.register();
-
-            // we will only be signalled once the queue drops below full, so this creates equivalent external behaviour
-            // however the advantage is that we never wake-up spuriously;
-            // we choose to always sleep, even if in the intervening time the queue has dropped below limit,
-            // so long as we _will_ eventually receive a signal
-            if (taskPermits(permits.get()) > maxTasksQueued)
-            {
-                // if we're blocking, we might as well directly schedule a worker if we aren't already at max
-                if (takeWorkPermit(true))
-                    pool.schedule(new Work(this));
-
-                metrics.totalBlocked.inc();
-                metrics.currentBlocked.inc();
-                s.awaitUninterruptibly();
-                metrics.currentBlocked.dec();
-            }
-            else // don't propagate our signal when we cancel, just cancel
-                s.cancel();
-        }
     }
 
     public enum TakeTaskPermitResult
@@ -181,8 +153,6 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService implements SE
             }
             if (permits.compareAndSet(current, updated))
             {
-                if (taskPermits == maxTasksQueued && hasRoom.hasWaiters())
-                    hasRoom.signalAll();
                 return result;
             }
         }
@@ -201,8 +171,6 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService implements SE
                 return false;
             if (permits.compareAndSet(current, combine(taskPermits - taskDelta, workPermits - 1)))
             {
-                if (takeTaskPermit && taskPermits == maxTasksQueued && hasRoom.hasWaiters())
-                    hasRoom.signalAll();
                 return true;
             }
         }
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 28a994c..7a07cf4 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -108,14 +108,14 @@ public class SharedExecutorPool
             schedule(Work.SPINNING);
     }
 
-    public synchronized LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
+    public synchronized LocalAwareExecutorService newExecutor(int maxConcurrency, String jmxPath, String name)
     {
-        return newExecutor(maxConcurrency, i -> {}, maxQueuedTasks, jmxPath, name);
+        return newExecutor(maxConcurrency, i -> {}, jmxPath, name);
     }
 
-    public LocalAwareExecutorService newExecutor(int maxConcurrency, LocalAwareExecutorService.MaximumPoolSizeListener maximumPoolSizeListener, int maxQueuedTasks, String jmxPath, String name)
+    public LocalAwareExecutorService newExecutor(int maxConcurrency, LocalAwareExecutorService.MaximumPoolSizeListener maximumPoolSizeListener, String jmxPath, String name)
     {
-        SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maximumPoolSizeListener, maxQueuedTasks, jmxPath, name);
+        SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maximumPoolSizeListener, jmxPath, name);
         executors.add(executor);
         return executor;
     }
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index d2ebe8d..d4b214c 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -191,7 +191,7 @@ public enum Stage
 
     static LocalAwareExecutorService multiThreadedLowSignalStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
     {
-        return SharedExecutorPool.SHARED.newExecutor(numThreads, onSetMaximumPoolSize, Integer.MAX_VALUE, jmxType, jmxName);
+        return SharedExecutorPool.SHARED.newExecutor(numThreads, onSetMaximumPoolSize, jmxType, jmxName);
     }
 
     static LocalAwareExecutorService singleThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 17e67a2..09ea2e5 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -463,7 +463,6 @@ public abstract class Message
     {
         private static final LocalAwareExecutorService requestExecutor = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
                                                                                             DatabaseDescriptor::setNativeTransportMaxThreads,
-                                                                                            Integer.MAX_VALUE,
                                                                                             "transport",
                                                                                             "Native-Transport-Requests");
 
diff --git a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
index fe464c7..f6c5720 100644
--- a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
+++ b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
@@ -106,7 +106,7 @@ public class LongSharedExecutorPoolTest
     {
         final int executorCount = 4;
         int threadCount = 8;
-        int maxQueued = 1024;
+        int scale = 1024;
         final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
         final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
         final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
@@ -116,11 +116,11 @@ public class LongSharedExecutorPoolTest
         final ExecutorService[] executors = new ExecutorService[executorCount];
         for (int i = 0 ; i < executors.length ; i++)
         {
-            executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
+            executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, "test" + i, "test" + i);
             threadCounts[i] = threadCount;
-            workCount[i] = new WeibullDistribution(2, maxQueued);
+            workCount[i] = new WeibullDistribution(2, scale);
             threadCount *= 2;
-            maxQueued *= 2;
+            scale *= 2;
         }
 
         long runs = 0;
diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
index 9a2d52d..a2fc238 100644
--- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@ -59,7 +59,7 @@ public class SEPExecutorTest
 
         for (int idx = 0; idx < 20; idx++)
         {
-            ExecutorService es = sharedPool.newExecutor(FBUtilities.getAvailableProcessors(), Integer.MAX_VALUE, "STAGE", run + MAGIC + idx);
+            ExecutorService es = sharedPool.newExecutor(FBUtilities.getAvailableProcessors(), "STAGE", run + MAGIC + idx);
             // Write to black hole
             es.execute(() -> nullPrintSteam.println("TEST" + es));
         }
@@ -87,7 +87,7 @@ public class SEPExecutorTest
         SharedExecutorPool sharedPool = new SharedExecutorPool("ChangingMaxWorkersMeetsConcurrencyGoalsTest");
         final AtomicInteger notifiedMaxPoolSize = new AtomicInteger();
 
-        LocalAwareExecutorService executor = sharedPool.newExecutor(0, notifiedMaxPoolSize::set, 4, "internal", "resizetest");
+        LocalAwareExecutorService executor = sharedPool.newExecutor(0, notifiedMaxPoolSize::set, "internal", "resizetest");
 
         // Keep feeding the executor work while resizing
         // so it stays under load.
diff --git a/test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
new file mode 100644
index 0000000..d4201a0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.*;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.junit.Assert.*;
+
+public class ThreadPoolMetricsTest
+{
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithNoBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-1"),
+                                                                                 "internal");
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-2"),
+                                                                                 "internal");
+        testMetricsWithBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testSEPExecutorMetrics()
+    {
+        SEPExecutor executor = (SEPExecutor) new SharedExecutorPool("ThreadPoolMetricsTest-2").newExecutor(2,
+                                                                                                           "ThreadPoolMetricsTest-3",
+                                                                                                           "internal");
+
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    public void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        assertEquals(2, metrics.maxPoolSize.getValue().intValue());
+
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();
+
+        // The ThreadPool has a size of 2 so the 2 first tasks should go into active straight away
+        threadPool.execute(task1);
+        threadPool.execute(task2);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+
+        // There are no threads available any more the 2 next tasks should go into the queue
+        threadPool.execute(task3);
+        threadPool.execute(task4);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // The queue is full the 2 next task should go into blocked and block the thread
+        BlockingTask task5 = new BlockingTask();
+        BlockingTask task6 = new BlockingTask();
+
+        AtomicInteger blockedThreads = new AtomicInteger(0);
+        new Thread(() ->
+        {
+            blockedThreads.incrementAndGet();
+            threadPool.execute(task5);
+            blockedThreads.decrementAndGet();
+        }).start();
+
+        spinAssertEquals(1, () -> blockedThreads.get(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(1L, () -> metrics.totalBlocked.getCount(), 1);
+
+        new Thread(() ->
+        {
+            blockedThreads.incrementAndGet();
+            threadPool.execute(task6);
+            blockedThreads.decrementAndGet();
+        }).start();
+
+        spinAssertEquals(2, () -> blockedThreads.get(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing first task to complete
+        task1.allowToComplete();
+
+        spinAssertEquals(true, () -> task3.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+        spinAssertEquals(1, () -> blockedThreads.get(), 1);
+
+        // Allowing second task to complete
+        task2.allowToComplete();
+
+        spinAssertEquals(true, () -> task4.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+        spinAssertEquals(0, () -> blockedThreads.get(), 1);
+
+        // Allowing third task to complete
+        task3.allowToComplete();
+
+        spinAssertEquals(true, () -> task5.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(3L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(1, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing fourth task to complete
+        task4.allowToComplete();
+
+        spinAssertEquals(true, () -> task6.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(4L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing last tasks to complete
+        task5.allowToComplete();
+        task6.allowToComplete();
+
+        spinAssertEquals(0, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(6L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+    }
+
+    public void testMetricsWithNoBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();
+
+        // The ThreadPool has a size of 2 so the 2 first tasks should go into active straight away
+        threadPool.execute(task1);
+
+        spinAssertEquals(1, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        threadPool.execute(task2);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // There are no threads available any more the 2 next tasks should go into the queue
+        threadPool.execute(task3);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(1, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        threadPool.execute(task4);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing first task to complete
+        task1.allowToComplete();
+
+        spinAssertEquals(true, () -> task3.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(1, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing second task to complete
+        task2.allowToComplete();
+
+        spinAssertEquals(true, () -> task4.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing third task to complete
+        task3.allowToComplete();
+
+        spinAssertEquals(1, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(3L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing fourth task to complete
+        task4.allowToComplete();
+
+        spinAssertEquals(0, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(4L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+    }
+
+    private class BlockingTask implements Runnable
+    {
+        private final CountDownLatch latch = new CountDownLatch(1);
+
+        private volatile boolean started;
+
+        public boolean isStarted()
+        {
+            return started;
+        }
+
+        public void allowToComplete()
+        {
+            latch.countDown();
+        }
+
+        @Override
+        public void run()
+        {
+            started = true;
+            try
+            {
+                latch.await(30, TimeUnit.SECONDS);
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: CIRCLE-CI do not merge

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch CASSANDRA-16186
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 5b7a828819b2a838ad05a989c8e71ff49dc403ab
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Tue Oct 13 15:12:57 2020 +0200

    CIRCLE-CI do not merge
---
 .circleci/config.yml | 82 ++++++++++++++++++++++++++--------------------------
 1 file changed, 41 insertions(+), 41 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index fcd2615..42f363f 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -3,10 +3,10 @@ jobs:
   j8_jvm_upgrade_dtests:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 1
+    parallelism: 10
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -94,10 +94,10 @@ jobs:
   j8_cqlsh-dtests-py2-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -174,7 +174,7 @@ jobs:
     resource_class: medium
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 25
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -263,10 +263,10 @@ jobs:
   j8_cqlsh-dtests-py38-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -340,10 +340,10 @@ jobs:
   j11_cqlsh-dtests-py3-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -418,10 +418,10 @@ jobs:
   j11_cqlsh-dtests-py3-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -496,10 +496,10 @@ jobs:
   j11_cqlsh-dtests-py38-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -574,10 +574,10 @@ jobs:
   j8_cqlsh-dtests-py3-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -651,10 +651,10 @@ jobs:
   j8_cqlsh-dtests-py2-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -728,10 +728,10 @@ jobs:
   j11_cqlsh-dtests-py2-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -806,10 +806,10 @@ jobs:
   j11_dtests-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -887,10 +887,10 @@ jobs:
   j8_dtests-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -945,10 +945,10 @@ jobs:
   j8_upgradetests-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: xlarge
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 100
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1092,7 +1092,7 @@ jobs:
     resource_class: medium
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 25
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1353,10 +1353,10 @@ jobs:
   j11_cqlsh-dtests-py2-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1431,10 +1431,10 @@ jobs:
   j8_dtests-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1489,10 +1489,10 @@ jobs:
   j11_cqlsh-dtests-py38-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1567,10 +1567,10 @@ jobs:
   j8_jvm_dtests:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 1
+    parallelism: 10
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1738,10 +1738,10 @@ jobs:
   j8_cqlsh-dtests-py3-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1815,10 +1815,10 @@ jobs:
   j8_cqlsh-dtests-py38-with-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11-w-dependencies:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -1982,10 +1982,10 @@ jobs:
   j11_dtests-no-vnodes:
     docker:
     - image: nastra/cassandra-testing-ubuntu1910-java11:20200603
-    resource_class: medium
+    resource_class: large
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 50
     steps:
     - attach_workspace:
         at: /home/cassandra
@@ -2066,7 +2066,7 @@ jobs:
     resource_class: medium
     working_directory: ~/
     shell: /bin/bash -eo pipefail -l
-    parallelism: 4
+    parallelism: 25
     steps:
     - attach_workspace:
         at: /home/cassandra


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org