You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/04/02 21:19:49 UTC

[1/6] git commit: Don't shut ExpiringMap down.

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 17278b3a3 -> d4ec31f21
  refs/heads/cassandra-2.1 f7e690d7f -> 8234bc15f
  refs/heads/trunk 5b85be04c -> 90387f63b


Don't shut ExpiringMap down.

Patch by Benedict, reviewed by brandonwilliams for CASSANDRA-6948


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4ec31f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4ec31f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4ec31f2

Branch: refs/heads/cassandra-2.0
Commit: d4ec31f21eb83502bd523c057bce2d3b249b3d80
Parents: 17278b3
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:05:14 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:14:30 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 13 +-----
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++---------------
 .../apache/cassandra/service/RemoveTest.java    |  1 -
 4 files changed, 15 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 66196d0..9bbcf07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.7
+ * Don't shut ExpiringMap down (CASSANDRA-6948)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)
  * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
  * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ad86bbd..094e861 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,7 +392,6 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void listen(InetAddress localEp) throws ConfigurationException
     {
-        callbacks.reset(); // hack to allow tests to stop/restart MS
         for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -536,7 +535,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -545,7 +544,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -654,11 +653,6 @@ public final class MessagingService implements MessagingServiceMBean
         subscribers.add(subcriber);
     }
 
-    public void clearCallbacksUnsafe()
-    {
-        callbacks.reset();
-    }
-
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
@@ -668,9 +662,6 @@ public final class MessagingService implements MessagingServiceMBean
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
-        // the important part
-        callbacks.shutdownBlocking();
-
         // attempt to humor tests that try to stop and restart MS
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 7eec40e..cbec808 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -35,7 +36,6 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
-    private volatile boolean shutdown;
 
     public static class CacheableObject<T>
     {
@@ -55,12 +55,17 @@ public class ExpiringMap<K, V>
         {
             return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
         }
+
+        public String toString()
+        {
+            return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
+        }
     }
 
     // if we use more ExpiringMaps we may want to add multiple threads to this executor
     private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
 
-    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
+    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -81,7 +86,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        Runnable runnable = new Runnable()
+        Runnable reaperTask = new Runnable()
         {
             public void run()
             {
@@ -100,45 +105,17 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public void shutdownBlocking()
-    {
-        service.shutdown();
-        try
-        {
-            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public void reset()
-    {
-        shutdown = false;
-        cache.clear();
-    }
-
-    public V put(K key, V value)
+    public CacheableObject<V> put(K key, V value)
     {
         return put(key, value, this.defaultExpiration);
     }
 
-    public V put(K key, V value, long timeout)
+    public CacheableObject<V> put(K key, V value, long timeout)
     {
-        if (shutdown)
-        {
-            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
-            // So we'll just sit on this thread until the rest of the server shutdown completes.
-            //
-            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
-            Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-        }
-        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
-        return (previous == null) ? null : previous.value;
+        return cache.put(key, new CacheableObject<V>(value, timeout));
     }
 
     public V get(K key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 62dd636..82e5e9e 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,7 +92,6 @@ public class RemoveTest
     public void tearDown()
     {
         SinkManager.clear();
-        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }
 


[6/6] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by br...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90387f63
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90387f63
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90387f63

Branch: refs/heads/trunk
Commit: 90387f63bc908b5ed91b47ee33454e85a98886de
Parents: 5b85be0 8234bc1
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:16:07 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:16:07 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/net/MessagingService.java  | 13 +-----
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++---------------
 .../apache/cassandra/service/RemoveTest.java    |  1 -
 4 files changed, 16 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/90387f63/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90387f63/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------


[5/6] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8234bc15
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8234bc15
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8234bc15

Branch: refs/heads/cassandra-2.1
Commit: 8234bc15fdc63095d0457c9bf6757a750fbbf4f2
Parents: f7e690d d4ec31f
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:15:41 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:15:41 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 13 +-----
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++---------------
 .../apache/cassandra/service/RemoveTest.java    |  1 -
 4 files changed, 15 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8234bc15/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index dc493dc,9bbcf07..db734b2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,44 -1,5 +1,45 @@@
 -2.0.7
 +2.1.0-beta2
 + * Fail write instead of logging a warning when unable to append to CL
 +   (CASSANDRA-6764)
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 + * Add multiple memory allocation options for memtables (CASSANDRA-6689)
 + * Remove adjusted op rate from stress output (CASSANDRA-6921)
 + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
 + * Serialize batchlog mutations with the version of the target node
 +   (CASSANDRA-6931)
 + * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
 + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
 +Merged from 2.0:
+  * Don't shut ExpiringMap down (CASSANDRA-6948)
   * Restrict Windows to parallel repairs (CASSANDRA-6907)
   * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
   * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8234bc15/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8234bc15/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------


[3/6] git commit: Don't shut ExpiringMap down.

Posted by br...@apache.org.
Don't shut ExpiringMap down.

Patch by Benedict, reviewed by brandonwilliams for CASSANDRA-6948


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4ec31f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4ec31f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4ec31f2

Branch: refs/heads/trunk
Commit: d4ec31f21eb83502bd523c057bce2d3b249b3d80
Parents: 17278b3
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:05:14 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:14:30 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 13 +-----
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++---------------
 .../apache/cassandra/service/RemoveTest.java    |  1 -
 4 files changed, 15 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 66196d0..9bbcf07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.7
+ * Don't shut ExpiringMap down (CASSANDRA-6948)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)
  * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
  * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ad86bbd..094e861 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,7 +392,6 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void listen(InetAddress localEp) throws ConfigurationException
     {
-        callbacks.reset(); // hack to allow tests to stop/restart MS
         for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -536,7 +535,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -545,7 +544,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -654,11 +653,6 @@ public final class MessagingService implements MessagingServiceMBean
         subscribers.add(subcriber);
     }
 
-    public void clearCallbacksUnsafe()
-    {
-        callbacks.reset();
-    }
-
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
@@ -668,9 +662,6 @@ public final class MessagingService implements MessagingServiceMBean
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
-        // the important part
-        callbacks.shutdownBlocking();
-
         // attempt to humor tests that try to stop and restart MS
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 7eec40e..cbec808 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -35,7 +36,6 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
-    private volatile boolean shutdown;
 
     public static class CacheableObject<T>
     {
@@ -55,12 +55,17 @@ public class ExpiringMap<K, V>
         {
             return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
         }
+
+        public String toString()
+        {
+            return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
+        }
     }
 
     // if we use more ExpiringMaps we may want to add multiple threads to this executor
     private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
 
-    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
+    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -81,7 +86,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        Runnable runnable = new Runnable()
+        Runnable reaperTask = new Runnable()
         {
             public void run()
             {
@@ -100,45 +105,17 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public void shutdownBlocking()
-    {
-        service.shutdown();
-        try
-        {
-            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public void reset()
-    {
-        shutdown = false;
-        cache.clear();
-    }
-
-    public V put(K key, V value)
+    public CacheableObject<V> put(K key, V value)
     {
         return put(key, value, this.defaultExpiration);
     }
 
-    public V put(K key, V value, long timeout)
+    public CacheableObject<V> put(K key, V value, long timeout)
     {
-        if (shutdown)
-        {
-            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
-            // So we'll just sit on this thread until the rest of the server shutdown completes.
-            //
-            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
-            Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-        }
-        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
-        return (previous == null) ? null : previous.value;
+        return cache.put(key, new CacheableObject<V>(value, timeout));
     }
 
     public V get(K key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 62dd636..82e5e9e 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,7 +92,6 @@ public class RemoveTest
     public void tearDown()
     {
         SinkManager.clear();
-        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }
 


[4/6] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8234bc15
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8234bc15
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8234bc15

Branch: refs/heads/trunk
Commit: 8234bc15fdc63095d0457c9bf6757a750fbbf4f2
Parents: f7e690d d4ec31f
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:15:41 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:15:41 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 13 +-----
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++---------------
 .../apache/cassandra/service/RemoveTest.java    |  1 -
 4 files changed, 15 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8234bc15/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index dc493dc,9bbcf07..db734b2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,44 -1,5 +1,45 @@@
 -2.0.7
 +2.1.0-beta2
 + * Fail write instead of logging a warning when unable to append to CL
 +   (CASSANDRA-6764)
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 + * Add multiple memory allocation options for memtables (CASSANDRA-6689)
 + * Remove adjusted op rate from stress output (CASSANDRA-6921)
 + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
 + * Serialize batchlog mutations with the version of the target node
 +   (CASSANDRA-6931)
 + * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
 + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
 +Merged from 2.0:
+  * Don't shut ExpiringMap down (CASSANDRA-6948)
   * Restrict Windows to parallel repairs (CASSANDRA-6907)
   * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
   * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8234bc15/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8234bc15/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------


[2/6] git commit: Don't shut ExpiringMap down.

Posted by br...@apache.org.
Don't shut ExpiringMap down.

Patch by Benedict, reviewed by brandonwilliams for CASSANDRA-6948


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4ec31f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4ec31f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4ec31f2

Branch: refs/heads/cassandra-2.1
Commit: d4ec31f21eb83502bd523c057bce2d3b249b3d80
Parents: 17278b3
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:05:14 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:14:30 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 13 +-----
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++---------------
 .../apache/cassandra/service/RemoveTest.java    |  1 -
 4 files changed, 15 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 66196d0..9bbcf07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.7
+ * Don't shut ExpiringMap down (CASSANDRA-6948)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)
  * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
  * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ad86bbd..094e861 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,7 +392,6 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void listen(InetAddress localEp) throws ConfigurationException
     {
-        callbacks.reset(); // hack to allow tests to stop/restart MS
         for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -536,7 +535,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -545,7 +544,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -654,11 +653,6 @@ public final class MessagingService implements MessagingServiceMBean
         subscribers.add(subcriber);
     }
 
-    public void clearCallbacksUnsafe()
-    {
-        callbacks.reset();
-    }
-
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
@@ -668,9 +662,6 @@ public final class MessagingService implements MessagingServiceMBean
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
-        // the important part
-        callbacks.shutdownBlocking();
-
         // attempt to humor tests that try to stop and restart MS
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 7eec40e..cbec808 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -35,7 +36,6 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
-    private volatile boolean shutdown;
 
     public static class CacheableObject<T>
     {
@@ -55,12 +55,17 @@ public class ExpiringMap<K, V>
         {
             return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
         }
+
+        public String toString()
+        {
+            return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
+        }
     }
 
     // if we use more ExpiringMaps we may want to add multiple threads to this executor
     private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
 
-    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
+    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -81,7 +86,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        Runnable runnable = new Runnable()
+        Runnable reaperTask = new Runnable()
         {
             public void run()
             {
@@ -100,45 +105,17 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public void shutdownBlocking()
-    {
-        service.shutdown();
-        try
-        {
-            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public void reset()
-    {
-        shutdown = false;
-        cache.clear();
-    }
-
-    public V put(K key, V value)
+    public CacheableObject<V> put(K key, V value)
     {
         return put(key, value, this.defaultExpiration);
     }
 
-    public V put(K key, V value, long timeout)
+    public CacheableObject<V> put(K key, V value, long timeout)
     {
-        if (shutdown)
-        {
-            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
-            // So we'll just sit on this thread until the rest of the server shutdown completes.
-            //
-            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
-            Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-        }
-        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
-        return (previous == null) ? null : previous.value;
+        return cache.put(key, new CacheableObject<V>(value, timeout));
     }
 
     public V get(K key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 62dd636..82e5e9e 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,7 +92,6 @@ public class RemoveTest
     public void tearDown()
     {
         SinkManager.clear();
-        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }