You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/12/05 14:42:53 UTC
cassandra git commit: Make sure that SEP Worker shuts down when pool
is shutdown
Repository: cassandra
Updated Branches:
refs/heads/trunk f54ac59ac -> eea68a2cf
Make sure that SEP Worker shuts down when pool is shutdown
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14815
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eea68a2c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eea68a2c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eea68a2c
Branch: refs/heads/trunk
Commit: eea68a2cfeb0134510deaaa5540afdf6d0c6ee7e
Parents: f54ac59
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Oct 11 18:23:20 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Dec 5 15:42:30 2018 +0100
----------------------------------------------------------------------
.../apache/cassandra/concurrent/SEPWorker.java | 17 ++++-
.../concurrent/SharedExecutorPool.java | 35 +++++++---
.../apache/cassandra/distributed/Instance.java | 2 +-
.../cassandra/concurrent/SEPExecutorTest.java | 70 ++++++++++++++++++++
4 files changed, 113 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/src/java/org/apache/cassandra/concurrent/SEPWorker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index 4549b48..d998ab7 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -74,9 +74,14 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
{
while (true)
{
+ if (pool.shuttingDown)
+ return;
+
if (isSpinning() && !selfAssign())
{
doWaitSpin();
+ // if the pool is terminating, but we have been assigned STOP_SIGNALLED, if we do not re-check
+ // whether the pool is shutting down this thread will go to sleep and block forever
continue;
}
@@ -118,8 +123,12 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
// return our work permit, and maybe signal shutdown
assigned.returnWorkPermit();
- if (shutdown && assigned.getActiveTaskCount() == 0)
- assigned.shutdown.signalAll();
+ if (shutdown)
+ {
+ if (assigned.getActiveTaskCount() == 0)
+ assigned.shutdown.signalAll();
+ return;
+ }
assigned = null;
// try to immediately reassign ourselves some work; if we fail, start spinning
@@ -169,7 +178,11 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
// if we're being descheduled, place ourselves in the descheduled collection
if (work.isStop())
+ {
pool.descheduled.put(workerId, this);
+ if (pool.shuttingDown)
+ return true;
+ }
// if we're currently stopped, and the new state is not a stop signal
// (which we can immediately convert to stopped), unpark the worker
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 5352ad7..62bede9 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -24,8 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.locks.LockSupport;
import static org.apache.cassandra.concurrent.SEPWorker.Work;
@@ -77,6 +76,8 @@ public class SharedExecutorPool
// the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();
+ volatile boolean shuttingDown = false;
+
public SharedExecutorPool(String poolName)
{
this.poolName = poolName;
@@ -113,13 +114,31 @@ public class SharedExecutorPool
return executor;
}
- @VisibleForTesting
- public static void shutdownSharedPool() throws InterruptedException
+ public void shutdown() throws InterruptedException
{
- for (SEPExecutor executor : SHARED.executors)
- executor.shutdown();
+ shuttingDown = true;
+ for (SEPExecutor executor : executors)
+ executor.shutdownNow();
+
+ terminateWorkers();
+
+ long until = System.nanoTime() + TimeUnit.MINUTES.toNanos(1L);
+ for (SEPExecutor executor : executors)
+ executor.shutdown.await(until - System.nanoTime(), TimeUnit.NANOSECONDS);
+ }
+
+ void terminateWorkers()
+ {
+ assert shuttingDown;
+
+ // To terminate our workers, we only need to unpark thread to make it runnable again,
+ // so that the pool.shuttingDown boolean is checked. If work was already in the process
+ // of being scheduled, worker will terminate upon running the task.
+ Map.Entry<Long, SEPWorker> e;
+ while (null != (e = descheduled.pollFirstEntry()))
+ e.getValue().assign(Work.SPINNING, false);
- for (SEPExecutor executor : SHARED.executors)
- executor.awaitTermination(60, TimeUnit.SECONDS);
+ while (null != (e = spinning.pollFirstEntry()))
+ LockSupport.unpark(e.getValue().thread);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/test/distributed/org/apache/cassandra/distributed/Instance.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java b/test/distributed/org/apache/cassandra/distributed/Instance.java
index f9ee5bb..f344411 100644
--- a/test/distributed/org/apache/cassandra/distributed/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -345,7 +345,7 @@ public class Instance extends InvokableInstance
BufferPool::shutdownLocalCleaner,
Ref::shutdownReferenceReaper,
StageManager::shutdownAndWait,
- SharedExecutorPool::shutdownSharedPool,
+ SharedExecutorPool.SHARED::shutdown,
Memtable.MEMORY_POOL::shutdown,
ScheduledExecutors::shutdownAndWait);
error = shutdownAndWait(error, ActiveRepairService.repairCommandExecutor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea68a2c/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
new file mode 100644
index 0000000..011a8ba
--- /dev/null
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class SEPExecutorTest
+{
+ @Test
+ public void shutdownTest() throws Throwable
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ shutdownOnce(i);
+ }
+ }
+
+ private static void shutdownOnce(int run) throws Throwable
+ {
+ SharedExecutorPool sharedPool = new SharedExecutorPool("SharedPool");
+ String MAGIC = "UNREPEATABLE_MAGIC_STRING";
+ OutputStream nullOutputStream = new OutputStream() {
+ public void write(int b) { }
+ };
+ PrintStream nullPrintSteam = new PrintStream(nullOutputStream);
+
+ for (int idx = 0; idx < 20; idx++)
+ {
+ ExecutorService es = sharedPool.newExecutor(FBUtilities.getAvailableProcessors(), Integer.MAX_VALUE, "STAGE", run + MAGIC + idx);
+ // Write to black hole
+ es.execute(() -> nullPrintSteam.println("TEST" + es));
+ }
+
+ // shutdown does not guarantee that threads are actually dead once it exits, only that they will stop promptly afterwards
+ sharedPool.shutdown();
+ for (Thread thread : Thread.getAllStackTraces().keySet())
+ {
+ if (thread.getName().contains(MAGIC))
+ {
+ thread.join(100);
+ if (thread.isAlive())
+ Assert.fail(thread + " is still running " + Arrays.toString(thread.getStackTrace()));
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org