You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/04/02 21:57:33 UTC

[1/8] git commit: Handle short slice start/finishes of different lengths

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 56d84a7c0 -> 4f47a44d0
  refs/heads/cassandra-2.1 e34d1af96 -> c6dbba1bb
  refs/heads/trunk c7324473f -> 6fe526a56


Handle short slice start/finishes of different lengths

Patch by Tyler Hobbs; reviewed by Sylvain Lebresne as a follow-up for
CASSANDRA-6825


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f4f74177
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f4f74177
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f4f74177

Branch: refs/heads/trunk
Commit: f4f7417735e179c73334ecfb3df6aeb1467b6842
Parents: 5aafa98
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Wed Apr 2 14:27:35 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Apr 2 14:29:44 2014 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/marshal/CompositeType.java  | 8 +++++---
 .../org/apache/cassandra/db/marshal/CompositeTypeTest.java   | 4 ++++
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f4f74177/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 7f08219..32fc432 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -278,16 +278,18 @@ public class CompositeType extends AbstractCompositeType
             // We could safely return true here, but there's a minor optimization: if the first component is restricted
             // to a single value, we can check that the second component falls within the min/max for that component
             // (and repeat for all components).
-            for (int i = 0; i < Math.min(Math.min(start.length, finish.length), minColumnNames.size()); i++)
+            for (int i = 0; i < minColumnNames.size(); i++)
             {
                 AbstractType<?> t = types.get(i);
+                ByteBuffer s = i < start.length ? start[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                ByteBuffer f = i < finish.length ? finish[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
                 // we already know the first component falls within its min/max range (otherwise we wouldn't get here)
-                if (i > 0 && !t.intersects(minColumnNames.get(i), maxColumnNames.get(i), start[i], finish[i]))
+                if (i > 0 && !t.intersects(minColumnNames.get(i), maxColumnNames.get(i), s, f))
                     continue outer;
 
                 // if this component isn't equal in the start and finish, we don't need to check any more
-                if (t.compare(start[i], finish[i]) != 0)
+                if (i >= start.length || i >= finish.length || t.compare(s, f) != 0)
                     break;
             }
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f4f74177/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
index 20cb5ef..df6f5e1 100644
--- a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
@@ -458,6 +458,10 @@ public class CompositeTypeTest extends SchemaLoader
         filter = new SliceQueryFilter(composite(1, 2), composite(1, 3), false, 1);
         assertFalse(comparator.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), filter));
 
+        // same case, but with missing start and end components and different lengths for start and end
+        filter = new SliceQueryFilter(composite(1, 2), composite(1), false, 1);
+        assertFalse(comparator.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), filter));
+
 
         // same as the previous set of tests, but the second component is equal in the slice start and end
         filter = new SliceQueryFilter(composite(1, 2, 0), composite(1, 2, 0), false, 1);


[4/8] git commit: Revert "Don't shut ExpiringMap down."

Posted by br...@apache.org.
Revert "Don't shut ExpiringMap down."

This reverts commit d4ec31f21eb83502bd523c057bce2d3b249b3d80.

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f47a44d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f47a44d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f47a44d

Branch: refs/heads/cassandra-2.0
Commit: 4f47a44d05a1c9b8e284c2d8c084edb7c0376d1d
Parents: 56d84a7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:25 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:25 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/net/MessagingService.java  | 13 +++++-
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
 .../apache/cassandra/service/RemoveTest.java    |  1 +
 4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d999e88..9003309 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
 2.0.7
  * Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)
  * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
  * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 094e861..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,6 +392,7 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void listen(InetAddress localEp) throws ConfigurationException
     {
+        callbacks.reset(); // hack to allow tests to stop/restart MS
         for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -535,7 +536,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -544,7 +545,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -653,6 +654,11 @@ public final class MessagingService implements MessagingServiceMBean
         subscribers.add(subcriber);
     }
 
+    public void clearCallbacksUnsafe()
+    {
+        callbacks.reset();
+    }
+
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
@@ -662,6 +668,9 @@ public final class MessagingService implements MessagingServiceMBean
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
+        // the important part
+        callbacks.shutdownBlocking();
+
         // attempt to humor tests that try to stop and restart MS
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index cbec808..7eec40e 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+    private volatile boolean shutdown;
 
     public static class CacheableObject<T>
     {
@@ -55,17 +55,12 @@ public class ExpiringMap<K, V>
         {
             return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
         }
-
-        public String toString()
-        {
-            return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
-        }
     }
 
     // if we use more ExpiringMaps we may want to add multiple threads to this executor
     private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
 
-    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -86,7 +81,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        Runnable reaperTask = new Runnable()
+        Runnable runnable = new Runnable()
         {
             public void run()
             {
@@ -105,17 +100,45 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public CacheableObject<V> put(K key, V value)
+    public void shutdownBlocking()
+    {
+        service.shutdown();
+        try
+        {
+            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void reset()
+    {
+        shutdown = false;
+        cache.clear();
+    }
+
+    public V put(K key, V value)
     {
         return put(key, value, this.defaultExpiration);
     }
 
-    public CacheableObject<V> put(K key, V value, long timeout)
+    public V put(K key, V value, long timeout)
     {
-        return cache.put(key, new CacheableObject<V>(value, timeout));
+        if (shutdown)
+        {
+            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+            // So we'll just sit on this thread until the rest of the server shutdown completes.
+            //
+            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+            Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        }
+        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
+        return (previous == null) ? null : previous.value;
     }
 
     public V get(K key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 82e5e9e..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,6 +92,7 @@ public class RemoveTest
     public void tearDown()
     {
         SinkManager.clear();
+        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }
 


[7/8] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by br...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6fe526a5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6fe526a5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6fe526a5

Branch: refs/heads/trunk
Commit: 6fe526a56eeaa340ea2b319a54cb4fca21440bf2
Parents: c732447 c6dbba1
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:52 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:52 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/db/filter/ColumnSlice.java |  9 ++--
 .../apache/cassandra/net/MessagingService.java  | 13 +++++-
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
 .../cassandra/db/filter/ColumnSliceTest.java    |  4 ++
 .../apache/cassandra/service/RemoveTest.java    |  1 +
 6 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fe526a5/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fe526a5/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------


[5/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6dbba1b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6dbba1b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6dbba1b

Branch: refs/heads/trunk
Commit: c6dbba1bbd2cf52bf3173c920e4526e9365a9d42
Parents: e34d1af 4f47a44
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:42 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:42 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/net/MessagingService.java  | 13 +++++-
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
 .../apache/cassandra/service/RemoveTest.java    |  1 +
 4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d5c8372,9003309..7da222e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,46 -1,5 +1,45 @@@
 -2.0.7
 +2.1.0-beta2
 + * Fail write instead of logging a warning when unable to append to CL
 +   (CASSANDRA-6764)
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 + * Add multiple memory allocation options for memtables (CASSANDRA-6689)
 + * Remove adjusted op rate from stress output (CASSANDRA-6921)
 + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
 + * Serialize batchlog mutations with the version of the target node
 +   (CASSANDRA-6931)
 + * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
 + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
 +Merged from 2.0:
   * Allow compaction of system tables during startup (CASSANDRA-6913)
-  * Don't shut ExpiringMap down (CASSANDRA-6948)
   * Restrict Windows to parallel repairs (CASSANDRA-6907)
   * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
   * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------


[8/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/db/marshal/CompositeType.java
	test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e34d1af9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e34d1af9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e34d1af9

Branch: refs/heads/trunk
Commit: e34d1af963b85c46afc973221a296498eaba8264
Parents: 5ab1a34 f4f7417
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Wed Apr 2 14:55:48 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Apr 2 14:55:48 2014 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/filter/ColumnSlice.java    | 9 ++++++---
 .../org/apache/cassandra/db/filter/ColumnSliceTest.java     | 4 ++++
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e34d1af9/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 3838ee5,9eff12a..227297e
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@@ -52,63 -47,20 +52,66 @@@ public class ColumnSlic
          this.finish = finish;
      }
  
 -    public boolean isAlwaysEmpty(AbstractType<?> comparator, boolean reversed)
 +    public boolean isAlwaysEmpty(CellNameType comparator, boolean reversed)
      {
 -        Comparator<ByteBuffer> orderedComparator = reversed ? comparator.reverseComparator : comparator;
 -        return (start.remaining() > 0 && finish.remaining() > 0 && orderedComparator.compare(start, finish) > 0);
 +        Comparator<Composite> orderedComparator = reversed ? comparator.reverseComparator() : comparator;
 +        return !start.isEmpty() && !finish.isEmpty() && orderedComparator.compare(start, finish) > 0;
      }
  
 -    public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
 +    public boolean includes(Comparator<Composite> cmp, Composite name)
      {
 -        return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
 +        return cmp.compare(start, name) <= 0 && (finish.isEmpty() || cmp.compare(finish, name) >= 0);
      }
  
 -    public boolean isBefore(Comparator<ByteBuffer> cmp, ByteBuffer name)
 +    public boolean isBefore(Comparator<Composite> cmp, Composite name)
      {
 -        return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
 +        return !finish.isEmpty() && cmp.compare(finish, name) < 0;
 +    }
 +
 +    public boolean intersects(List<ByteBuffer> minCellNames, List<ByteBuffer> maxCellNames, CellNameType comparator, boolean reversed)
 +    {
 +        assert minCellNames.size() == maxCellNames.size();
 +
 +        Composite sStart = reversed ? finish : start;
 +        Composite sEnd = reversed ? start : finish;
 +
 +        if (compare(sStart, maxCellNames, comparator, true) > 0 || compare(sEnd, minCellNames, comparator, false) < 0)
 +            return false;
 +
 +        // We could safely return true here, but there's a minor optimization: if the first component is restricted
 +        // to a single value, we can check that the second component falls within the min/max for that component
 +        // (and repeat for all components).
-         for (int i = 0; i < Math.min(Math.min(sStart.size(), sEnd.size()), minCellNames.size()); i++)
++        for (int i = 0; i < minCellNames.size(); i++)
 +        {
 +            AbstractType<?> t = comparator.subtype(i);
++            ByteBuffer s = i < sStart.size() ? sStart.get(i) : ByteBufferUtil.EMPTY_BYTE_BUFFER;
++            ByteBuffer f = i < sEnd.size() ? sEnd.get(i) : ByteBufferUtil.EMPTY_BYTE_BUFFER;
++
 +            // we already know the first component falls within its min/max range (otherwise we wouldn't get here)
-             if (i > 0 && (t.compare(sEnd.get(i), minCellNames.get(i)) < 0 || t.compare(sStart.get(i), maxCellNames.get(i)) > 0))
++            if (i > 0 && (t.compare(f, minCellNames.get(i)) < 0 || t.compare(s, maxCellNames.get(i)) > 0))
 +                return false;
 +
 +            // if this component isn't equal in the start and finish, we don't need to check any more
-             if (t.compare(sStart.get(i), sEnd.get(i)) != 0)
++            if (i >= sStart.size() || i >= sEnd.size() || t.compare(s, f) != 0)
 +                break;
 +        }
 +
 +        return true;
 +    }
 +
 +    /** Helper method for intersects() */
 +    private int compare(Composite sliceBounds, List<ByteBuffer> sstableBounds, CellNameType comparator, boolean isSliceStart)
 +    {
 +        for (int i = 0; i < sstableBounds.size(); i++)
 +        {
 +            if (i >= sliceBounds.size())
 +                return isSliceStart ? -1 : 1;
 +
 +            int comparison = comparator.subtype(i).compare(sliceBounds.get(i), sstableBounds.get(i));
 +            if (comparison != 0)
 +                return comparison;
 +        }
 +        return 0;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e34d1af9/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
index 4718795,0000000..e2de0e6
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
@@@ -1,290 -1,0 +1,294 @@@
 +/*
 + * * Licensed to the Apache Software Foundation (ASF) under one
 + * * or more contributor license agreements.  See the NOTICE file
 + * * distributed with this work for additional information
 + * * regarding copyright ownership.  The ASF licenses this file
 + * * to you under the Apache License, Version 2.0 (the
 + * * "License"); you may not use this file except in compliance
 + * * with the License.  You may obtain a copy of the License at
 + * *
 + * *    http://www.apache.org/licenses/LICENSE-2.0
 + * *
 + * * Unless required by applicable law or agreed to in writing,
 + * * software distributed under the License is distributed on an
 + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * * KIND, either express or implied.  See the License for the
 + * * specific language governing permissions and limitations
 + * * under the License.
 + * */
 +package org.apache.cassandra.db.filter;
 +
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.Int32Type;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.junit.Test;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +public class ColumnSliceTest
 +{
 +    @Test
 +    public void testIntersectsSingleSlice()
 +    {
 +        List<AbstractType<?>> types = new ArrayList<>();
 +        types.add(Int32Type.instance);
 +        types.add(Int32Type.instance);
 +        types.add(Int32Type.instance);
 +        CompoundDenseCellNameType nameType = new CompoundDenseCellNameType(types);
 +
 +        // filter falls entirely before sstable
 +        ColumnSlice slice = new ColumnSlice(composite(0, 0, 0), composite(1, 0, 0));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with empty start
 +        slice = new ColumnSlice(composite(), composite(1, 0, 0));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with missing components for start
 +        slice = new ColumnSlice(composite(0), composite(1, 0, 0));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with missing components for start and end
 +        slice = new ColumnSlice(composite(0), composite(1, 0));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +
 +        // end of slice matches start of sstable for the first component, but not the second component
 +        slice = new ColumnSlice(composite(0, 0, 0), composite(1, 0, 0));
 +        assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with missing components for start
 +        slice = new ColumnSlice(composite(0), composite(1, 0, 0));
 +        assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with missing components for start and end
 +        slice = new ColumnSlice(composite(0), composite(1, 0));
 +        assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // first two components match, but not the last
 +        slice = new ColumnSlice(composite(0, 0, 0), composite(1, 1, 0));
 +        assertFalse(slice.intersects(columnNames(1, 1, 1), columnNames(3, 1, 1), nameType, false));
 +
 +        // all three components in slice end match the start of the sstable
 +        slice = new ColumnSlice(composite(0, 0, 0), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 1, 1), columnNames(3, 1, 1), nameType, false));
 +
 +
 +        // filter falls entirely after sstable
 +        slice = new ColumnSlice(composite(4, 0, 0), composite(4, 0, 0));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with empty end
 +        slice = new ColumnSlice(composite(4, 0, 0), composite());
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with missing components for end
 +        slice = new ColumnSlice(composite(4, 0, 0), composite(1));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +        // same case, but with missing components for start and end
 +        slice = new ColumnSlice(composite(4, 0), composite(1));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
 +
 +
 +        // start of slice matches end of sstable for the first component, but not the second component
 +        slice = new ColumnSlice(composite(1, 1, 1), composite(2, 0, 0));
 +        assertFalse(slice.intersects(columnNames(0, 0, 0), columnNames(1, 0, 0), nameType, false));
 +
 +        // start of slice matches end of sstable for the first two components, but not the last component
 +        slice = new ColumnSlice(composite(1, 1, 1), composite(2, 0, 0));
 +        assertFalse(slice.intersects(columnNames(0, 0, 0), columnNames(1, 1, 0), nameType, false));
 +
 +        // all three components in the slice start match the end of the sstable
 +        slice = new ColumnSlice(composite(1, 1, 1), composite(2, 0, 0));
 +        assertTrue(slice.intersects(columnNames(0, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +
 +        // slice covers entire sstable (with no matching edges)
 +        slice = new ColumnSlice(composite(0, 0, 0), composite(2, 0, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // same case, but with empty ends
 +        slice = new ColumnSlice(composite(), composite());
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // same case, but with missing components
 +        slice = new ColumnSlice(composite(0), composite(2, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // slice covers entire sstable (with matching start)
 +        slice = new ColumnSlice(composite(1, 0, 0), composite(2, 0, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // slice covers entire sstable (with matching end)
 +        slice = new ColumnSlice(composite(0, 0, 0), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // slice covers entire sstable (with matching start and end)
 +        slice = new ColumnSlice(composite(1, 0, 0), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +
 +        // slice falls entirely within sstable (with matching start)
 +        slice = new ColumnSlice(composite(1, 0, 0), composite(1, 1, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // same case, but with a missing end component
 +        slice = new ColumnSlice(composite(1, 0, 0), composite(1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // slice falls entirely within sstable (with matching end)
 +        slice = new ColumnSlice(composite(1, 1, 0), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +        // same case, but with a missing start component
 +        slice = new ColumnSlice(composite(1, 1), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
 +
 +
 +        // slice falls entirely within sstable
 +        slice = new ColumnSlice(composite(1, 1, 0), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
 +
 +        // same case, but with a missing start component
 +        slice = new ColumnSlice(composite(1, 1), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
 +
 +        // same case, but with a missing start and end components
 +        slice = new ColumnSlice(composite(1), composite(1, 2));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
 +
 +        // slice falls entirely within sstable (slice start and end are the same)
 +        slice = new ColumnSlice(composite(1, 1, 1), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
 +
 +
 +        // slice starts within sstable, empty end
 +        slice = new ColumnSlice(composite(1, 1, 1), composite());
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // same case, but with missing end components
 +        slice = new ColumnSlice(composite(1, 1, 1), composite(3));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // slice starts within sstable (matching sstable start), empty end
 +        slice = new ColumnSlice(composite(1, 0, 0), composite());
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // same case, but with missing end components
 +        slice = new ColumnSlice(composite(1, 0, 0), composite(3));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // slice starts within sstable (matching sstable end), empty end
 +        slice = new ColumnSlice(composite(2, 0, 0), composite());
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // same case, but with missing end components
 +        slice = new ColumnSlice(composite(2, 0, 0), composite(3));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +
 +        // slice ends within sstable, empty end
 +        slice = new ColumnSlice(composite(), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // same case, but with missing start components
 +        slice = new ColumnSlice(composite(0), composite(1, 1, 1));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // slice ends within sstable (matching sstable start), empty start
 +        slice = new ColumnSlice(composite(), composite(1, 0, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // same case, but with missing start components
 +        slice = new ColumnSlice(composite(0), composite(1, 0, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // slice ends within sstable (matching sstable end), empty start
 +        slice = new ColumnSlice(composite(), composite(2, 0, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +        // same case, but with missing start components
 +        slice = new ColumnSlice(composite(0), composite(2, 0, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
 +
 +
 +        // the slice technically falls within the sstable range, but since the first component is restricted to
 +        // a single value, we can check that the second component does not fall within its min/max
 +        slice = new ColumnSlice(composite(1, 2, 0), composite(1, 3, 0));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
 +        // same case, but with a missing start component
 +        slice = new ColumnSlice(composite(1, 2), composite(1, 3, 0));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
 +        // same case, but with a missing end component
 +        slice = new ColumnSlice(composite(1, 2, 0), composite(1, 3));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
 +        // same case, but with a missing start and end components
 +        slice = new ColumnSlice(composite(1, 2), composite(1, 3));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
++        // same case, but with missing start and end components and different lengths for start and end
++        slice = new ColumnSlice(composite(1, 2), composite(1));
++        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
++
 +
 +        // same as the previous set of tests, but the second component is equal in the slice start and end
 +        slice = new ColumnSlice(composite(1, 2, 0), composite(1, 2, 0));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
 +        // same case, but with a missing start component
 +        slice = new ColumnSlice(composite(1, 2), composite(1, 2, 0));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
 +        // same case, but with a missing end component
 +        slice = new ColumnSlice(composite(1, 2, 0), composite(1, 2));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
 +        // same case, but with a missing start and end components
 +        slice = new ColumnSlice(composite(1, 2), composite(1, 2));
 +        assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
 +
 +        // same as the previous tests, but it's the third component that doesn't fit in its range this time
 +        slice = new ColumnSlice(composite(1, 1, 2), composite(1, 1, 3));
 +        assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(2, 2, 1), nameType, false));
 +
 +
 +        // basic check on reversed slices
 +        slice = new ColumnSlice(composite(1, 0, 0), composite(0, 0, 0));
 +        assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, true));
 +
 +        slice = new ColumnSlice(composite(1, 0, 0), composite(0, 0, 0));
 +        assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, true));
 +
 +        slice = new ColumnSlice(composite(1, 1, 1), composite(1, 1, 0));
 +        assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, true));
 +    }
 +
 +    private static Composite composite(Integer ... components)
 +    {
 +        List<AbstractType<?>> types = new ArrayList<>();
 +        types.add(Int32Type.instance);
 +        types.add(Int32Type.instance);
 +        types.add(Int32Type.instance);
 +        CompoundDenseCellNameType nameType = new CompoundDenseCellNameType(types);
 +        return nameType.make(components);
 +    }
 +
 +    private static List<ByteBuffer> columnNames(Integer ... components)
 +    {
 +        List<ByteBuffer> names = new ArrayList<>(components.length);
 +        for (int component : components)
 +            names.add(ByteBufferUtil.bytes(component));
 +        return names;
 +    }
 +}


[6/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6dbba1b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6dbba1b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6dbba1b

Branch: refs/heads/cassandra-2.1
Commit: c6dbba1bbd2cf52bf3173c920e4526e9365a9d42
Parents: e34d1af 4f47a44
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:42 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:42 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/net/MessagingService.java  | 13 +++++-
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
 .../apache/cassandra/service/RemoveTest.java    |  1 +
 4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d5c8372,9003309..7da222e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,46 -1,5 +1,45 @@@
 -2.0.7
 +2.1.0-beta2
 + * Fail write instead of logging a warning when unable to append to CL
 +   (CASSANDRA-6764)
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 + * Add multiple memory allocation options for memtables (CASSANDRA-6689)
 + * Remove adjusted op rate from stress output (CASSANDRA-6921)
 + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
 + * Serialize batchlog mutations with the version of the target node
 +   (CASSANDRA-6931)
 + * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
 + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
 +Merged from 2.0:
   * Allow compaction of system tables during startup (CASSANDRA-6913)
-  * Don't shut ExpiringMap down (CASSANDRA-6948)
   * Restrict Windows to parallel repairs (CASSANDRA-6907)
   * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
   * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------


[2/8] git commit: Revert "Don't shut ExpiringMap down."

Posted by br...@apache.org.
Revert "Don't shut ExpiringMap down."

This reverts commit d4ec31f21eb83502bd523c057bce2d3b249b3d80.

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f47a44d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f47a44d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f47a44d

Branch: refs/heads/cassandra-2.1
Commit: 4f47a44d05a1c9b8e284c2d8c084edb7c0376d1d
Parents: 56d84a7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:25 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:25 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/net/MessagingService.java  | 13 +++++-
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
 .../apache/cassandra/service/RemoveTest.java    |  1 +
 4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d999e88..9003309 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
 2.0.7
  * Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)
  * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
  * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 094e861..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,6 +392,7 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void listen(InetAddress localEp) throws ConfigurationException
     {
+        callbacks.reset(); // hack to allow tests to stop/restart MS
         for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -535,7 +536,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -544,7 +545,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -653,6 +654,11 @@ public final class MessagingService implements MessagingServiceMBean
         subscribers.add(subcriber);
     }
 
+    public void clearCallbacksUnsafe()
+    {
+        callbacks.reset();
+    }
+
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
@@ -662,6 +668,9 @@ public final class MessagingService implements MessagingServiceMBean
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
+        // the important part
+        callbacks.shutdownBlocking();
+
         // attempt to humor tests that try to stop and restart MS
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index cbec808..7eec40e 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+    private volatile boolean shutdown;
 
     public static class CacheableObject<T>
     {
@@ -55,17 +55,12 @@ public class ExpiringMap<K, V>
         {
             return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
         }
-
-        public String toString()
-        {
-            return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
-        }
     }
 
     // if we use more ExpiringMaps we may want to add multiple threads to this executor
     private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
 
-    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -86,7 +81,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        Runnable reaperTask = new Runnable()
+        Runnable runnable = new Runnable()
         {
             public void run()
             {
@@ -105,17 +100,45 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public CacheableObject<V> put(K key, V value)
+    public void shutdownBlocking()
+    {
+        service.shutdown();
+        try
+        {
+            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void reset()
+    {
+        shutdown = false;
+        cache.clear();
+    }
+
+    public V put(K key, V value)
     {
         return put(key, value, this.defaultExpiration);
     }
 
-    public CacheableObject<V> put(K key, V value, long timeout)
+    public V put(K key, V value, long timeout)
     {
-        return cache.put(key, new CacheableObject<V>(value, timeout));
+        if (shutdown)
+        {
+            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+            // So we'll just sit on this thread until the rest of the server shutdown completes.
+            //
+            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+            Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        }
+        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
+        return (previous == null) ? null : previous.value;
     }
 
     public V get(K key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 82e5e9e..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,6 +92,7 @@ public class RemoveTest
     public void tearDown()
     {
         SinkManager.clear();
+        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }
 


[3/8] git commit: Revert "Don't shut ExpiringMap down."

Posted by br...@apache.org.
Revert "Don't shut ExpiringMap down."

This reverts commit d4ec31f21eb83502bd523c057bce2d3b249b3d80.

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f47a44d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f47a44d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f47a44d

Branch: refs/heads/trunk
Commit: 4f47a44d05a1c9b8e284c2d8c084edb7c0376d1d
Parents: 56d84a7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:25 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:25 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/net/MessagingService.java  | 13 +++++-
 .../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
 .../apache/cassandra/service/RemoveTest.java    |  1 +
 4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d999e88..9003309 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
 2.0.7
  * Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)
  * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
  * Fix NPE in MeteredFlusher (CASSANDRA-6820)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 094e861..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,6 +392,7 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void listen(InetAddress localEp) throws ConfigurationException
     {
+        callbacks.reset(); // hack to allow tests to stop/restart MS
         for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -535,7 +536,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -544,7 +545,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
-        ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -653,6 +654,11 @@ public final class MessagingService implements MessagingServiceMBean
         subscribers.add(subcriber);
     }
 
+    public void clearCallbacksUnsafe()
+    {
+        callbacks.reset();
+    }
+
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */
@@ -662,6 +668,9 @@ public final class MessagingService implements MessagingServiceMBean
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
+        // the important part
+        callbacks.shutdownBlocking();
+
         // attempt to humor tests that try to stop and restart MS
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index cbec808..7eec40e 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+    private volatile boolean shutdown;
 
     public static class CacheableObject<T>
     {
@@ -55,17 +55,12 @@ public class ExpiringMap<K, V>
         {
             return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
         }
-
-        public String toString()
-        {
-            return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
-        }
     }
 
     // if we use more ExpiringMaps we may want to add multiple threads to this executor
     private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
 
-    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -86,7 +81,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        Runnable reaperTask = new Runnable()
+        Runnable runnable = new Runnable()
         {
             public void run()
             {
@@ -105,17 +100,45 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public CacheableObject<V> put(K key, V value)
+    public void shutdownBlocking()
+    {
+        service.shutdown();
+        try
+        {
+            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void reset()
+    {
+        shutdown = false;
+        cache.clear();
+    }
+
+    public V put(K key, V value)
     {
         return put(key, value, this.defaultExpiration);
     }
 
-    public CacheableObject<V> put(K key, V value, long timeout)
+    public V put(K key, V value, long timeout)
     {
-        return cache.put(key, new CacheableObject<V>(value, timeout));
+        if (shutdown)
+        {
+            // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+            // So we'll just sit on this thread until the rest of the server shutdown completes.
+            //
+            // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+            Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        }
+        CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
+        return (previous == null) ? null : previous.value;
     }
 
     public V get(K key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 82e5e9e..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,6 +92,7 @@ public class RemoveTest
     public void tearDown()
     {
         SinkManager.clear();
+        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }