You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/01/11 19:48:52 UTC

git commit: prevent slow clients from postponing shutdown indefinitely patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3727

Updated Branches:
  refs/heads/cassandra-1.0 d10da1552 -> 185eca5d1


prevent slow clients from postponing shutdown indefinitely
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3727


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/185eca5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/185eca5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/185eca5d

Branch: refs/heads/cassandra-1.0
Commit: 185eca5d1fa7e384bb888c144d06abbced0fd577
Parents: d10da15
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jan 11 11:44:32 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jan 11 12:48:40 2012 -0600

----------------------------------------------------------------------
 .../concurrent/DebuggableThreadPoolExecutor.java   |    2 +-
 .../cassandra/concurrent/NamedThreadFactory.java   |    1 +
 src/java/org/apache/cassandra/db/Memtable.java     |    8 +++-
 .../org/apache/cassandra/net/MessagingService.java |   10 +---
 .../cassandra/service/AbstractCassandraDaemon.java |    8 ++-
 .../apache/cassandra/service/StorageService.java   |    9 ++--
 .../cassandra/thrift/CustomTThreadPoolServer.java  |   43 +++++++--------
 .../org/apache/cassandra/utils/ExpiringMap.java    |   17 ++++++
 .../org/apache/cassandra/service/RemoveTest.java   |    2 +-
 9 files changed, 59 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 7a344d8..f111d37 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -108,7 +108,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     protected void onFinalRejection(Runnable task) {}
 
     @Override
-    public void afterExecute(Runnable r, Throwable t)
+    protected void afterExecute(Runnable r, Throwable t)
     {
         super.afterExecute(r,t);
         logExceptionsAfterExecute(r, t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 4cee8dc..a60a0d5 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -50,6 +50,7 @@ public class NamedThreadFactory implements ThreadFactory
         String name = id + ":" + n.getAndIncrement();
         Thread thread = new Thread(runnable, name);
         thread.setPriority(priority);
+        thread.setDaemon(true);
         return thread;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index fdafc6d..412b800 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
@@ -57,7 +58,12 @@ public class Memtable
     // we're careful to only allow one count to run at a time because counting is slow
     // (can be minutes, for a large memtable and a busy server), so we could keep memtables
     // alive after they're flushed and would otherwise be GC'd.
-    private static final ExecutorService meterExecutor = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
+    private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
+                                                                                          1,
+                                                                                          Integer.MAX_VALUE,
+                                                                                          TimeUnit.MILLISECONDS,
+                                                                                          new SynchronousQueue<Runnable>(),
+                                                                                          new NamedThreadFactory("MemoryMeter"))
     {
         @Override
         protected void afterExecute(Runnable r, Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 1526fa3..9ff110e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -475,15 +475,9 @@ public final class MessagingService implements MessagingServiceMBean
     }
 
     /**
-     * There isn't a good way to shut down the MessagingService. One problem (but not the only one)
-     * is that StorageProxy has no way to communicate back to clients, "I'm nominally alive, but I can't
-     * send that request to the nodes with your data."  Neither TimedOut nor Unavailable is appropriate
-     * to return in that situation.
-     *
-     * So instead of shutting down MS and letting StorageProxy/clients cope somehow, we shut down
-     * the Thrift service and then wait for all the outstanding requests to finish or timeout.
+     * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
-    public void waitForCallbacks()
+    public void shutdown()
     {
         logger_.info("Waiting for messaging service to quiesce");
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
index 1a3ebc9..028f82f 100644
--- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.log4j.PropertyConfigurator;
@@ -391,13 +392,16 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
     /**
      * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool
      * interface (for integration with Avro), and performs ClientState cleanup.
+     *
+     * (Note that the tasks being executed perform their own while-command-process
+     * loop until the client disconnects.)
      */
     public static class CleaningThreadPool extends ThreadPoolExecutor 
     {
         private ThreadLocal<ClientState> state;
         public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads)
         {
-            super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+            super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift"));
             this.state = state;
         }
 
@@ -408,7 +412,5 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
             DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
             state.get().logout();
         }
-
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 9c1195d..33f58a0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -323,7 +323,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         daemon.startRPCServer();
     }
 
-    // should only be called via JMX
     public void stopRPCServer()
     {
         if (daemon == null)
@@ -347,7 +346,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         Gossiper.instance.unregister(migrationManager);
         Gossiper.instance.unregister(this);
         Gossiper.instance.stop();
-        MessagingService.instance().waitForCallbacks();
+        MessagingService.instance().shutdown();
         // give it a second so that task accepted before the MessagingService shutdown gets submitted to the stage (to avoid RejectedExecutionException)
         try { Thread.sleep(1000L); } catch (InterruptedException e) {}
         StageManager.shutdownNow();
@@ -449,7 +448,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
                 // In-progress writes originating here could generate hints to be written, so shut down MessagingService
                 // before mutation stage, so we can get all the hints saved before shutting down
-                MessagingService.instance().waitForCallbacks();
+                MessagingService.instance().shutdown();
                 mutationStage.shutdown();
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 StorageProxy.instance.verifyNoHintsInProgress();
@@ -2110,7 +2109,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             public void run()
             {
                 Gossiper.instance.stop();
-                MessagingService.instance().waitForCallbacks();
+                MessagingService.instance().shutdown();
                 StageManager.shutdownNow();
                 setMode(Mode.DECOMMISSIONED, true);
                 // let op be responsible for killing the process
@@ -2512,7 +2511,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         Gossiper.instance.stop();
 
         setMode(Mode.DRAINING, "shutting down MessageService", false);
-        MessagingService.instance().waitForCallbacks();
+        MessagingService.instance().shutdown();
         setMode(Mode.DRAINING, "waiting for streaming", false);
         MessagingService.instance().waitForStreaming();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index c9a5f5b..161ff12 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -119,27 +119,24 @@ public class CustomTThreadPoolServer extends TServer
         }
 
         executorService_.shutdown();
-
-        // Loop until awaitTermination finally does return without a interrupted
-        // exception. If we don't do this, then we'll shut down prematurely. We want
-        // to let the executorService clear it's task queue, closing client sockets
-        // appropriately.
-        long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
-        long now = System.currentTimeMillis();
-        while (timeoutMS >= 0)
-        {
-            try
-            {
-                executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
-                break;
-            }
-            catch (InterruptedException ix)
-            {
-                long newnow = System.currentTimeMillis();
-                timeoutMS -= (newnow - now);
-                now = newnow;
-            }
-        }
+        // Thrift's default shutdown waits for the WorkerProcess threads to complete.  We do not,
+        // because doing that allows a client to hold our shutdown "hostage" by simply not sending
+        // another message after stop is called (since process will block indefinitely trying to read
+        // the next meessage header).
+        //
+        // The "right" fix would be to update thrift to set a socket timeout on client connections
+        // (and tolerate unintentional timeouts until stopped_ is set).  But this requires deep
+        // changes to the code generator, so simply setting these threads to daemon (in our custom
+        // CleaningThreadPool) and ignoring them after shutdown is good enough.
+        //
+        // Remember, our goal on shutdown is not necessarily that each client request we receive
+        // gets answered first [to do that, you should redirect clients to a different coordinator
+        // first], but rather (1) to make sure that for each update we ack as successful, we generate
+        // hints for any non-responsive replicas, and (2) to make sure that we quickly stop
+        // accepting client connections so shutdown can continue.  Not waiting for the WorkerProcess
+        // threads here accomplishes (2); MessagingService's shutdown method takes care of (1).
+        //
+        // See CASSANDRA-3335 and CASSANDRA-3727.
     }
 
     public void stop()
@@ -184,7 +181,9 @@ public class CustomTThreadPoolServer extends TServer
                 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
                 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
                 // we check stopped_ first to make sure we're not supposed to be shutting
-                // down. this is necessary for graceful shutdown.
+                // down. this is necessary for graceful shutdown.  (but not sufficient,
+                // since process() can take arbitrarily long waiting for client input.
+                // See comments at the end of serve().)
                 while (!stopped_ && processor.process(inputProtocol, outputProtocol))
                 {
                     inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index ffd3c2e..0672259 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -30,6 +30,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+    private volatile boolean shutdown;
 
     private static class CacheableObject<T>
     {
@@ -104,6 +105,7 @@ public class ExpiringMap<K, V>
 
     public void shutdown()
     {
+        shutdown = true;
         while (!cache.isEmpty())
         {
             logger.trace("Waiting for {} entries before shutting down ExpiringMap", cache.size());
@@ -131,6 +133,21 @@ public class ExpiringMap<K, V>
 
     public V put(K key, V value, long timeout)
     {
+        if (shutdown)
+        {
+            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+            // So we'll just sit on this thread until the rest of the server shutdown completes.
+            //
+            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+            try
+            {
+                Thread.sleep(Long.MAX_VALUE);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
         CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
         return (previous == null) ? null : previous.getValue();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/185eca5d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 3034ab3..7ee7d69 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -85,7 +85,7 @@ public class RemoveTest extends CleanupHelper
     {
         SinkManager.clear();
         MessagingService.instance().clearCallbacksUnsafe();
-        MessagingService.instance().waitForCallbacks();
+        MessagingService.instance().shutdown();
         ss.setPartitionerUnsafe(oldPartitioner);
     }