You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/01/11 19:48:52 UTC
git commit: prevent slow clients from postponing shutdown
indefinitely patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3727
Updated Branches:
refs/heads/cassandra-1.0 d10da1552 -> 185eca5d1
prevent slow clients from postponing shutdown indefinitely
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3727
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/185eca5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/185eca5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/185eca5d
Branch: refs/heads/cassandra-1.0
Commit: 185eca5d1fa7e384bb888c144d06abbced0fd577
Parents: d10da15
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jan 11 11:44:32 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jan 11 12:48:40 2012 -0600
----------------------------------------------------------------------
.../concurrent/DebuggableThreadPoolExecutor.java | 2 +-
.../cassandra/concurrent/NamedThreadFactory.java | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 8 +++-
.../org/apache/cassandra/net/MessagingService.java | 10 +---
.../cassandra/service/AbstractCassandraDaemon.java | 8 ++-
.../apache/cassandra/service/StorageService.java | 9 ++--
.../cassandra/thrift/CustomTThreadPoolServer.java | 43 +++++++--------
.../org/apache/cassandra/utils/ExpiringMap.java | 17 ++++++
.../org/apache/cassandra/service/RemoveTest.java | 2 +-
9 files changed, 59 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 7a344d8..f111d37 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -108,7 +108,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
protected void onFinalRejection(Runnable task) {}
@Override
- public void afterExecute(Runnable r, Throwable t)
+ protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r,t);
logExceptionsAfterExecute(r, t);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 4cee8dc..a60a0d5 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -50,6 +50,7 @@ public class NamedThreadFactory implements ThreadFactory
String name = id + ":" + n.getAndIncrement();
Thread thread = new Thread(runnable, name);
thread.setPriority(priority);
+ thread.setDaemon(true);
return thread;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index fdafc6d..412b800 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
import org.apache.cassandra.db.commitlog.ReplayPosition;
@@ -57,7 +58,12 @@ public class Memtable
// we're careful to only allow one count to run at a time because counting is slow
// (can be minutes, for a large memtable and a busy server), so we could keep memtables
// alive after they're flushed and would otherwise be GC'd.
- private static final ExecutorService meterExecutor = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
+ private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.MILLISECONDS,
+ new SynchronousQueue<Runnable>(),
+ new NamedThreadFactory("MemoryMeter"))
{
@Override
protected void afterExecute(Runnable r, Throwable t)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 1526fa3..9ff110e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -475,15 +475,9 @@ public final class MessagingService implements MessagingServiceMBean
}
/**
- * There isn't a good way to shut down the MessagingService. One problem (but not the only one)
- * is that StorageProxy has no way to communicate back to clients, "I'm nominally alive, but I can't
- * send that request to the nodes with your data." Neither TimedOut nor Unavailable is appropriate
- * to return in that situation.
- *
- * So instead of shutting down MS and letting StorageProxy/clients cope somehow, we shut down
- * the Thrift service and then wait for all the outstanding requests to finish or timeout.
+ * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
*/
- public void waitForCallbacks()
+ public void shutdown()
{
logger_.info("Waiting for messaging service to quiesce");
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
index 1a3ebc9..028f82f 100644
--- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.gms.Gossiper;
import org.apache.log4j.PropertyConfigurator;
@@ -391,13 +392,16 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
/**
* A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool
* interface (for integration with Avro), and performs ClientState cleanup.
+ *
+ * (Note that the tasks being executed perform their own while-command-process
+ * loop until the client disconnects.)
*/
public static class CleaningThreadPool extends ThreadPoolExecutor
{
private ThreadLocal<ClientState> state;
public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads)
{
- super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+ super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift"));
this.state = state;
}
@@ -408,7 +412,5 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
state.get().logout();
}
-
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 9c1195d..33f58a0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -323,7 +323,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
daemon.startRPCServer();
}
- // should only be called via JMX
public void stopRPCServer()
{
if (daemon == null)
@@ -347,7 +346,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Gossiper.instance.unregister(migrationManager);
Gossiper.instance.unregister(this);
Gossiper.instance.stop();
- MessagingService.instance().waitForCallbacks();
+ MessagingService.instance().shutdown();
// give it a second so that task accepted before the MessagingService shutdown gets submitted to the stage (to avoid RejectedExecutionException)
try { Thread.sleep(1000L); } catch (InterruptedException e) {}
StageManager.shutdownNow();
@@ -449,7 +448,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// In-progress writes originating here could generate hints to be written, so shut down MessagingService
// before mutation stage, so we can get all the hints saved before shutting down
- MessagingService.instance().waitForCallbacks();
+ MessagingService.instance().shutdown();
mutationStage.shutdown();
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
@@ -2110,7 +2109,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public void run()
{
Gossiper.instance.stop();
- MessagingService.instance().waitForCallbacks();
+ MessagingService.instance().shutdown();
StageManager.shutdownNow();
setMode(Mode.DECOMMISSIONED, true);
// let op be responsible for killing the process
@@ -2512,7 +2511,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Gossiper.instance.stop();
setMode(Mode.DRAINING, "shutting down MessageService", false);
- MessagingService.instance().waitForCallbacks();
+ MessagingService.instance().shutdown();
setMode(Mode.DRAINING, "waiting for streaming", false);
MessagingService.instance().waitForStreaming();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index c9a5f5b..161ff12 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -119,27 +119,24 @@ public class CustomTThreadPoolServer extends TServer
}
executorService_.shutdown();
-
- // Loop until awaitTermination finally does return without a interrupted
- // exception. If we don't do this, then we'll shut down prematurely. We want
- // to let the executorService clear it's task queue, closing client sockets
- // appropriately.
- long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
- long now = System.currentTimeMillis();
- while (timeoutMS >= 0)
- {
- try
- {
- executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
- break;
- }
- catch (InterruptedException ix)
- {
- long newnow = System.currentTimeMillis();
- timeoutMS -= (newnow - now);
- now = newnow;
- }
- }
+ // Thrift's default shutdown waits for the WorkerProcess threads to complete. We do not,
+ // because doing that allows a client to hold our shutdown "hostage" by simply not sending
+ // another message after stop is called (since process will block indefinitely trying to read
+ // the next meessage header).
+ //
+ // The "right" fix would be to update thrift to set a socket timeout on client connections
+ // (and tolerate unintentional timeouts until stopped_ is set). But this requires deep
+ // changes to the code generator, so simply setting these threads to daemon (in our custom
+ // CleaningThreadPool) and ignoring them after shutdown is good enough.
+ //
+ // Remember, our goal on shutdown is not necessarily that each client request we receive
+ // gets answered first [to do that, you should redirect clients to a different coordinator
+ // first], but rather (1) to make sure that for each update we ack as successful, we generate
+ // hints for any non-responsive replicas, and (2) to make sure that we quickly stop
+ // accepting client connections so shutdown can continue. Not waiting for the WorkerProcess
+ // threads here accomplishes (2); MessagingService's shutdown method takes care of (1).
+ //
+ // See CASSANDRA-3335 and CASSANDRA-3727.
}
public void stop()
@@ -184,7 +181,9 @@ public class CustomTThreadPoolServer extends TServer
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
- // down. this is necessary for graceful shutdown.
+ // down. this is necessary for graceful shutdown. (but not sufficient,
+ // since process() can take arbitrarily long waiting for client input.
+ // See comments at the end of serve().)
while (!stopped_ && processor.process(inputProtocol, outputProtocol))
{
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index ffd3c2e..0672259 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -30,6 +30,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class ExpiringMap<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+ private volatile boolean shutdown;
private static class CacheableObject<T>
{
@@ -104,6 +105,7 @@ public class ExpiringMap<K, V>
public void shutdown()
{
+ shutdown = true;
while (!cache.isEmpty())
{
logger.trace("Waiting for {} entries before shutting down ExpiringMap", cache.size());
@@ -131,6 +133,21 @@ public class ExpiringMap<K, V>
public V put(K key, V value, long timeout)
{
+ if (shutdown)
+ {
+ // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+ // So we'll just sit on this thread until the rest of the server shutdown completes.
+ //
+ // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+ try
+ {
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
return (previous == null) ? null : previous.getValue();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 3034ab3..7ee7d69 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -85,7 +85,7 @@ public class RemoveTest extends CleanupHelper
{
SinkManager.clear();
MessagingService.instance().clearCallbacksUnsafe();
- MessagingService.instance().waitForCallbacks();
+ MessagingService.instance().shutdown();
ss.setPartitionerUnsafe(oldPartitioner);
}