You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/04/02 21:57:35 UTC

[3/8] git commit: Revert "Don't shut ExpiringMap down."

Revert "Don't shut ExpiringMap down."

This reverts commit d4ec31f21eb83502bd523c057bce2d3b249b3d80.

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f47a44d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f47a44d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f47a44d

Branch: refs/heads/trunk
Commit: 4f47a44d05a1c9b8e284c2d8c084edb7c0376d1d
Parents: 56d84a7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:25 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:25 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/net/MessagingService.java  | 13 +++++-
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
 .../apache/cassandra/service/RemoveTest.java    |  1 +
 4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d999e88..9003309 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
 2.0.7
  * Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)
  * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
  * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 094e861..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,6 +392,7 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void listen(InetAddress localEp) throws ConfigurationException
     {
+        callbacks.reset(); // hack to allow tests to stop/restart MS
         for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -535,7 +536,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -544,7 +545,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -653,6 +654,11 @@ public final class MessagingService implements MessagingServiceMBean
         subscribers.add(subcriber);
     }
 
+    public void clearCallbacksUnsafe()
+    {
+        callbacks.reset();
+    }
+
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
@@ -662,6 +668,9 @@ public final class MessagingService implements MessagingServiceMBean
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
+        // the important part
+        callbacks.shutdownBlocking();
+
         // attempt to humor tests that try to stop and restart MS
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index cbec808..7eec40e 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+    private volatile boolean shutdown;
 
     public static class CacheableObject<T>
     {
@@ -55,17 +55,12 @@ public class ExpiringMap<K, V>
         {
             return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
         }
-
-        public String toString()
-        {
-            return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
-        }
     }
 
     // if we use more ExpiringMaps we may want to add multiple threads to this executor
     private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
 
-    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -86,7 +81,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        Runnable reaperTask = new Runnable()
+        Runnable runnable = new Runnable()
         {
             public void run()
             {
@@ -105,17 +100,45 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public CacheableObject<V> put(K key, V value)
+    public void shutdownBlocking()
+    {
+        service.shutdown();
+        try
+        {
+            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void reset()
+    {
+        shutdown = false;
+        cache.clear();
+    }
+
+    public V put(K key, V value)
     {
         return put(key, value, this.defaultExpiration);
     }
 
-    public CacheableObject<V> put(K key, V value, long timeout)
+    public V put(K key, V value, long timeout)
     {
-        return cache.put(key, new CacheableObject<V>(value, timeout));
+        if (shutdown)
+        {
+            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+            // So we'll just sit on this thread until the rest of the server shutdown completes.
+            //
+            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+            Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        }
+        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
+        return (previous == null) ? null : previous.value;
     }
 
     public V get(K key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 82e5e9e..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,6 +92,7 @@ public class RemoveTest
     public void tearDown()
     {
         SinkManager.clear();
+        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }