You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/12/05 14:42:53 UTC

cassandra git commit: Make sure that SEP Worker shuts down when pool is shutdown

Repository: cassandra
Updated Branches:
  refs/heads/trunk f54ac59ac -> eea68a2cf


Make sure that SEP Worker shuts down when pool is shutdown

Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14815

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eea68a2c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eea68a2c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eea68a2c

Branch: refs/heads/trunk
Commit: eea68a2cfeb0134510deaaa5540afdf6d0c6ee7e
Parents: f54ac59
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Oct 11 18:23:20 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Dec 5 15:42:30 2018 +0100

----------------------------------------------------------------------
 .../apache/cassandra/concurrent/SEPWorker.java  | 17 ++++-
 .../concurrent/SharedExecutorPool.java          | 35 +++++++---
 .../apache/cassandra/distributed/Instance.java  |  2 +-
 .../cassandra/concurrent/SEPExecutorTest.java   | 70 ++++++++++++++++++++
 4 files changed, 113 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/src/java/org/apache/cassandra/concurrent/SEPWorker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index 4549b48..d998ab7 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -74,9 +74,14 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
         {
             while (true)
             {
+                if (pool.shuttingDown)
+                    return;
+
                 if (isSpinning() && !selfAssign())
                 {
                     doWaitSpin();
+                    // if the pool is terminating, but we have been assigned STOP_SIGNALLED, if we do not re-check
+                    // whether the pool is shutting down this thread will go to sleep and block forever
                     continue;
                 }
 
@@ -118,8 +123,12 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
 
                 // return our work permit, and maybe signal shutdown
                 assigned.returnWorkPermit();
-                if (shutdown && assigned.getActiveTaskCount() == 0)
-                    assigned.shutdown.signalAll();
+                if (shutdown)
+                {
+                    if (assigned.getActiveTaskCount() == 0)
+                        assigned.shutdown.signalAll();
+                    return;
+                }
                 assigned = null;
 
                 // try to immediately reassign ourselves some work; if we fail, start spinning
@@ -169,7 +178,11 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
 
             // if we're being descheduled, place ourselves in the descheduled collection
             if (work.isStop())
+            {
                 pool.descheduled.put(workerId, this);
+                if (pool.shuttingDown)
+                    return true;
+            }
 
             // if we're currently stopped, and the new state is not a stop signal
             // (which we can immediately convert to stopped), unpark the worker

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 5352ad7..62bede9 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -24,8 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.locks.LockSupport;
 
 import static org.apache.cassandra.concurrent.SEPWorker.Work;
 
@@ -77,6 +76,8 @@ public class SharedExecutorPool
     // the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
     final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();
 
+    volatile boolean shuttingDown = false;
+
     public SharedExecutorPool(String poolName)
     {
         this.poolName = poolName;
@@ -113,13 +114,31 @@ public class SharedExecutorPool
         return executor;
     }
 
-    @VisibleForTesting
-    public static void shutdownSharedPool() throws InterruptedException
+    public void shutdown() throws InterruptedException
     {
-        for (SEPExecutor executor : SHARED.executors)
-            executor.shutdown();
+        shuttingDown = true;
+        for (SEPExecutor executor : executors)
+            executor.shutdownNow();
+
+        terminateWorkers();
+
+        long until = System.nanoTime() + TimeUnit.MINUTES.toNanos(1L);
+        for (SEPExecutor executor : executors)
+            executor.shutdown.await(until - System.nanoTime(), TimeUnit.NANOSECONDS);
+    }
+
+    void terminateWorkers()
+    {
+        assert shuttingDown;
+
+        // To terminate our workers, we only need to unpark thread to make it runnable again,
+        // so that the pool.shuttingDown boolean is checked. If work was already in the process
+        // of being scheduled, worker will terminate upon running the task.
+        Map.Entry<Long, SEPWorker> e;
+        while (null != (e = descheduled.pollFirstEntry()))
+            e.getValue().assign(Work.SPINNING, false);
 
-        for (SEPExecutor executor : SHARED.executors)
-            executor.awaitTermination(60, TimeUnit.SECONDS);
+        while (null != (e = spinning.pollFirstEntry()))
+            LockSupport.unpark(e.getValue().thread);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/test/distributed/org/apache/cassandra/distributed/Instance.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java b/test/distributed/org/apache/cassandra/distributed/Instance.java
index f9ee5bb..f344411 100644
--- a/test/distributed/org/apache/cassandra/distributed/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -345,7 +345,7 @@ public class Instance extends InvokableInstance
                     BufferPool::shutdownLocalCleaner,
                     Ref::shutdownReferenceReaper,
                     StageManager::shutdownAndWait,
-                    SharedExecutorPool::shutdownSharedPool,
+                    SharedExecutorPool.SHARED::shutdown,
                     Memtable.MEMORY_POOL::shutdown,
                     ScheduledExecutors::shutdownAndWait);
             error = shutdownAndWait(error, ActiveRepairService.repairCommandExecutor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
new file mode 100644
index 0000000..011a8ba
--- /dev/null
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class SEPExecutorTest
+{
+    @Test
+    public void shutdownTest() throws Throwable
+    {
+        for (int i = 0; i < 1000; i++)
+        {
+            shutdownOnce(i);
+        }
+    }
+
+    private static void shutdownOnce(int run) throws Throwable
+    {
+        SharedExecutorPool sharedPool = new SharedExecutorPool("SharedPool");
+        String MAGIC = "UNREPEATABLE_MAGIC_STRING";
+        OutputStream nullOutputStream = new OutputStream() {
+            public void write(int b) { }
+        };
+        PrintStream nullPrintSteam = new PrintStream(nullOutputStream);
+
+        for (int idx = 0; idx < 20; idx++)
+        {
+            ExecutorService es = sharedPool.newExecutor(FBUtilities.getAvailableProcessors(), Integer.MAX_VALUE, "STAGE", run + MAGIC + idx);
+            // Write to black hole
+            es.execute(() -> nullPrintSteam.println("TEST" + es));
+        }
+
+        // shutdown does not guarantee that threads are actually dead once it exits, only that they will stop promptly afterwards
+        sharedPool.shutdown();
+        for (Thread thread : Thread.getAllStackTraces().keySet())
+        {
+            if (thread.getName().contains(MAGIC))
+            {
+                thread.join(100);
+                if (thread.isAlive())
+                    Assert.fail(thread + " is still running " + Arrays.toString(thread.getStackTrace()));
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org