You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/11/04 16:26:10 UTC

[GitHub] [cassandra] adelapena commented on a change in pull request #775: CASSANDRA-16186 (trunk): Remove the SEPExecutor blocking behavior

adelapena commented on a change in pull request #775:
URL: https://github.com/apache/cassandra/pull/775#discussion_r517315448



##########
File path: src/java/org/apache/cassandra/concurrent/SEPExecutor.java
##########
@@ -43,16 +42,13 @@
     private final MaximumPoolSizeListener maximumPoolSizeListener;
     public final String name;
     private final String mbeanName;
-    public final int maxTasksQueued;
-    private final ThreadPoolMetrics metrics;
+    public final ThreadPoolMetrics metrics;

Review comment:
       Nit: we could mark this with `@VisibleForTesting`

##########
File path: test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.*;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.junit.Assert.*;
+
+public class ThreadPoolMetricsTest
+{
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithNoBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-1"),
+                                                                                 "internal");
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-2"),
+                                                                                 "internal");
+        testMetricsWithBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testSEPExecutorMetrics()
+    {
+        SEPExecutor executor = (SEPExecutor) new SharedExecutorPool("ThreadPoolMetricsTest-2").newExecutor(2,
+                                                                                                           "ThreadPoolMetricsTest-3",
+                                                                                                           "internal");
+
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    public void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)

Review comment:
       ```suggestion
       private static void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
   ```

##########
File path: test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.*;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.junit.Assert.*;
+
+public class ThreadPoolMetricsTest
+{
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithNoBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-1"),
+                                                                                 "internal");
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-2"),
+                                                                                 "internal");
+        testMetricsWithBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testSEPExecutorMetrics()
+    {
+        SEPExecutor executor = (SEPExecutor) new SharedExecutorPool("ThreadPoolMetricsTest-2").newExecutor(2,
+                                                                                                           "ThreadPoolMetricsTest-3",
+                                                                                                           "internal");
+
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    public void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        assertEquals(2, metrics.maxPoolSize.getValue().intValue());
+
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();
+
+        // The ThreadPool has a size of 2 so the 2 first tasks should go into active straight away
+        threadPool.execute(task1);
+        threadPool.execute(task2);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+
+        // There are no threads available any more the 2 next tasks should go into the queue
+        threadPool.execute(task3);
+        threadPool.execute(task4);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // The queue is full the 2 next task should go into blocked and block the thread
+        BlockingTask task5 = new BlockingTask();
+        BlockingTask task6 = new BlockingTask();
+
+        AtomicInteger blockedThreads = new AtomicInteger(0);
+        new Thread(() ->
+        {
+            blockedThreads.incrementAndGet();
+            threadPool.execute(task5);
+            blockedThreads.decrementAndGet();
+        }).start();
+
+        spinAssertEquals(1, () -> blockedThreads.get(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(1L, () -> metrics.totalBlocked.getCount(), 1);
+
+        new Thread(() ->
+        {
+            blockedThreads.incrementAndGet();
+            threadPool.execute(task6);
+            blockedThreads.decrementAndGet();
+        }).start();
+
+        spinAssertEquals(2, () -> blockedThreads.get(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing first task to complete
+        task1.allowToComplete();
+
+        spinAssertEquals(true, () -> task3.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+        spinAssertEquals(1, () -> blockedThreads.get(), 1);
+
+        // Allowing second task to complete
+        task2.allowToComplete();
+
+        spinAssertEquals(true, () -> task4.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+        spinAssertEquals(0, () -> blockedThreads.get(), 1);
+
+        // Allowing third task to complete
+        task3.allowToComplete();
+
+        spinAssertEquals(true, () -> task5.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(3L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(1, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing fourth task to complete
+        task4.allowToComplete();
+
+        spinAssertEquals(true, () -> task6.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(4L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing last tasks to complete
+        task5.allowToComplete();
+        task6.allowToComplete();
+
+        spinAssertEquals(0, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(6L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+    }
+
+    public void testMetricsWithNoBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();
+
+        // The ThreadPool has a size of 2 so the 2 first tasks should go into active straight away
+        threadPool.execute(task1);
+
+        spinAssertEquals(1, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        threadPool.execute(task2);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // There are no threads available any more the 2 next tasks should go into the queue
+        threadPool.execute(task3);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(1, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        threadPool.execute(task4);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing first task to complete
+        task1.allowToComplete();
+
+        spinAssertEquals(true, () -> task3.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(1, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing second task to complete
+        task2.allowToComplete();
+
+        spinAssertEquals(true, () -> task4.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing third task to complete
+        task3.allowToComplete();
+
+        spinAssertEquals(1, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(3L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing fourth task to complete
+        task4.allowToComplete();
+
+        spinAssertEquals(0, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(4L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+    }
+
+    private class BlockingTask implements Runnable

Review comment:
       ```suggestion
       private static class BlockingTask implements Runnable
   ```

##########
File path: test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.*;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.junit.Assert.*;
+
+public class ThreadPoolMetricsTest
+{
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithNoBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-1"),
+                                                                                 "internal");
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-2"),
+                                                                                 "internal");
+        testMetricsWithBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testSEPExecutorMetrics()
+    {
+        SEPExecutor executor = (SEPExecutor) new SharedExecutorPool("ThreadPoolMetricsTest-2").newExecutor(2,
+                                                                                                           "ThreadPoolMetricsTest-3",
+                                                                                                           "internal");
+
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    public void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        assertEquals(2, metrics.maxPoolSize.getValue().intValue());
+
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();
+
+        // The ThreadPool has a size of 2 so the 2 first tasks should go into active straight away
+        threadPool.execute(task1);
+        threadPool.execute(task2);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);

Review comment:
       We could also check that the other metrics are still zero.

##########
File path: test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
##########
@@ -106,7 +106,7 @@ private void testPromptnessOfExecution(long intervalNanos, float loadIncrement)
     {
         final int executorCount = 4;
         int threadCount = 8;
-        int maxQueued = 1024;
+        int scale = 1024;

Review comment:
       If I'm right this test is not included in CircleCI results. I have started a run in [ci-cassandra](https://ci-cassandra.apache.org/view/all/job/Cassandra-devbranch-test-burn/44/).

##########
File path: test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.*;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.junit.Assert.*;
+
+public class ThreadPoolMetricsTest
+{
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithNoBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-1"),
+                                                                                 "internal");
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-2"),
+                                                                                 "internal");
+        testMetricsWithBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testSEPExecutorMetrics()
+    {
+        SEPExecutor executor = (SEPExecutor) new SharedExecutorPool("ThreadPoolMetricsTest-2").newExecutor(2,
+                                                                                                           "ThreadPoolMetricsTest-3",
+                                                                                                           "internal");
+
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    public void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        assertEquals(2, metrics.maxPoolSize.getValue().intValue());
+
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();
+
+        // The ThreadPool has a size of 2 so the 2 first tasks should go into active straight away
+        threadPool.execute(task1);
+        threadPool.execute(task2);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+
+        // There are no threads available any more the 2 next tasks should go into the queue
+        threadPool.execute(task3);
+        threadPool.execute(task4);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // The queue is full the 2 next task should go into blocked and block the thread
+        BlockingTask task5 = new BlockingTask();
+        BlockingTask task6 = new BlockingTask();
+
+        AtomicInteger blockedThreads = new AtomicInteger(0);
+        new Thread(() ->
+        {
+            blockedThreads.incrementAndGet();
+            threadPool.execute(task5);
+            blockedThreads.decrementAndGet();
+        }).start();
+
+        spinAssertEquals(1, () -> blockedThreads.get(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(1L, () -> metrics.totalBlocked.getCount(), 1);
+
+        new Thread(() ->
+        {
+            blockedThreads.incrementAndGet();
+            threadPool.execute(task6);
+            blockedThreads.decrementAndGet();
+        }).start();
+
+        spinAssertEquals(2, () -> blockedThreads.get(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing first task to complete
+        task1.allowToComplete();
+
+        spinAssertEquals(true, () -> task3.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(1L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+        spinAssertEquals(1, () -> blockedThreads.get(), 1);
+
+        // Allowing second task to complete
+        task2.allowToComplete();
+
+        spinAssertEquals(true, () -> task4.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(2L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+        spinAssertEquals(0, () -> blockedThreads.get(), 1);
+
+        // Allowing third task to complete
+        task3.allowToComplete();
+
+        spinAssertEquals(true, () -> task5.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(3L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(1, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing fourth task to complete
+        task4.allowToComplete();
+
+        spinAssertEquals(true, () -> task6.isStarted(), 1);
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(4L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+
+        // Allowing last tasks to complete
+        task5.allowToComplete();
+        task6.allowToComplete();
+
+        spinAssertEquals(0, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(6L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(0, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(2L, () -> metrics.totalBlocked.getCount(), 1);
+    }
+
+    public void testMetricsWithNoBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)

Review comment:
       ```suggestion
       private static void testMetricsWithNoBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
   ```

##########
File path: test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.*;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.junit.Assert.*;
+
+public class ThreadPoolMetricsTest
+{
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithNoBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-1"),
+                                                                                 "internal");
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-2"),
+                                                                                 "internal");
+        testMetricsWithBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testSEPExecutorMetrics()
+    {
+        SEPExecutor executor = (SEPExecutor) new SharedExecutorPool("ThreadPoolMetricsTest-2").newExecutor(2,
+                                                                                                           "ThreadPoolMetricsTest-3",
+                                                                                                           "internal");
+
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    public void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        assertEquals(2, metrics.maxPoolSize.getValue().intValue());
+
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();
+
+        // The ThreadPool has a size of 2 so the 2 first tasks should go into active straight away
+        threadPool.execute(task1);
+        threadPool.execute(task2);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+
+        // There are no threads available any more the 2 next tasks should go into the queue
+        threadPool.execute(task3);
+        threadPool.execute(task4);
+
+        spinAssertEquals(2, () -> metrics.activeTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.completedTasks.getValue().longValue(), 1);
+        spinAssertEquals(2, () -> metrics.pendingTasks.getValue().intValue(), 1);
+        spinAssertEquals(0L, () -> metrics.currentBlocked.getCount(), 1);
+        spinAssertEquals(0L, () -> metrics.totalBlocked.getCount(), 1);

Review comment:
       Not: These calls to `spinAssertEquals`, and the others below, can be slightly simplified to:
   ```suggestion
           spinAssertEquals(2, metrics.activeTasks::getValue, 1);
           spinAssertEquals(0L, metrics.completedTasks::getValue, 1);
           spinAssertEquals(2, metrics.pendingTasks::getValue, 1);
           spinAssertEquals(0L, metrics.currentBlocked::getCount, 1);
           spinAssertEquals(0L, metrics.totalBlocked::getCount, 1);
   ```
   Also, we always use a timeout of one second, so we could consider having a shortcut:
   ```java
   private static void spinAssertEquals(Object expected, Supplier<Object> actualSupplier)
   {
       Util.spinAssertEquals(expected, actualSupplier, 1);
   }
   ```

##########
File path: test/unit/org/apache/cassandra/metrics/ThreadPoolMetricsTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.*;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.junit.Assert.*;
+
+public class ThreadPoolMetricsTest
+{
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithNoBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-1"),
+                                                                                 "internal");
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testJMXEnabledThreadPoolMetricsWithBlockedThread()
+    {
+        JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(2,
+                                                                                 Integer.MAX_VALUE,
+                                                                                 TimeUnit.SECONDS,
+                                                                                 new ArrayBlockingQueue<>(2),
+                                                                                 new NamedThreadFactory("ThreadPoolMetricsTest-2"),
+                                                                                 "internal");
+        testMetricsWithBlockedThreads(executor, executor.metrics);
+    }
+
+    @Test
+    public void testSEPExecutorMetrics()
+    {
+        SEPExecutor executor = (SEPExecutor) new SharedExecutorPool("ThreadPoolMetricsTest-2").newExecutor(2,
+                                                                                                           "ThreadPoolMetricsTest-3",
+                                                                                                           "internal");
+
+        testMetricsWithNoBlockedThreads(executor, executor.metrics);
+    }
+
+    public void testMetricsWithBlockedThreads(LocalAwareExecutorService threadPool, ThreadPoolMetrics metrics)
+    {
+        assertEquals(2, metrics.maxPoolSize.getValue().intValue());
+
+        BlockingTask task1 = new BlockingTask();
+        BlockingTask task2 = new BlockingTask();
+        BlockingTask task3 = new BlockingTask();
+        BlockingTask task4 = new BlockingTask();

Review comment:
       We could verify that, at the start, all metrics are zero, just in case. Same for `testMetricsWithNoBlockedThreads`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org