You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/04/02 21:57:33 UTC
[1/8] git commit: Handle short slice start/finishes of different
lengths
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 56d84a7c0 -> 4f47a44d0
refs/heads/cassandra-2.1 e34d1af96 -> c6dbba1bb
refs/heads/trunk c7324473f -> 6fe526a56
Handle short slice start/finishes of different lengths
Patch by Tyler Hobbs; reviewed by Sylvain Lebresne as a follow-up for
CASSANDRA-6825
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f4f74177
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f4f74177
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f4f74177
Branch: refs/heads/trunk
Commit: f4f7417735e179c73334ecfb3df6aeb1467b6842
Parents: 5aafa98
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Wed Apr 2 14:27:35 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Apr 2 14:29:44 2014 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/marshal/CompositeType.java | 8 +++++---
.../org/apache/cassandra/db/marshal/CompositeTypeTest.java | 4 ++++
2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f4f74177/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 7f08219..32fc432 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -278,16 +278,18 @@ public class CompositeType extends AbstractCompositeType
// We could safely return true here, but there's a minor optimization: if the first component is restricted
// to a single value, we can check that the second component falls within the min/max for that component
// (and repeat for all components).
- for (int i = 0; i < Math.min(Math.min(start.length, finish.length), minColumnNames.size()); i++)
+ for (int i = 0; i < minColumnNames.size(); i++)
{
AbstractType<?> t = types.get(i);
+ ByteBuffer s = i < start.length ? start[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ ByteBuffer f = i < finish.length ? finish[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
// we already know the first component falls within its min/max range (otherwise we wouldn't get here)
- if (i > 0 && !t.intersects(minColumnNames.get(i), maxColumnNames.get(i), start[i], finish[i]))
+ if (i > 0 && !t.intersects(minColumnNames.get(i), maxColumnNames.get(i), s, f))
continue outer;
// if this component isn't equal in the start and finish, we don't need to check any more
- if (t.compare(start[i], finish[i]) != 0)
+ if (i >= start.length || i >= finish.length || t.compare(s, f) != 0)
break;
}
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f4f74177/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
index 20cb5ef..df6f5e1 100644
--- a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
@@ -458,6 +458,10 @@ public class CompositeTypeTest extends SchemaLoader
filter = new SliceQueryFilter(composite(1, 2), composite(1, 3), false, 1);
assertFalse(comparator.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), filter));
+ // same case, but with missing start and end components and different lengths for start and end
+ filter = new SliceQueryFilter(composite(1, 2), composite(1), false, 1);
+ assertFalse(comparator.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), filter));
+
// same as the previous set of tests, but the second component is equal in the slice start and end
filter = new SliceQueryFilter(composite(1, 2, 0), composite(1, 2, 0), false, 1);
[4/8] git commit: Revert "Don't shut ExpiringMap down."
Posted by br...@apache.org.
Revert "Don't shut ExpiringMap down."
This reverts commit d4ec31f21eb83502bd523c057bce2d3b249b3d80.
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f47a44d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f47a44d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f47a44d
Branch: refs/heads/cassandra-2.0
Commit: 4f47a44d05a1c9b8e284c2d8c084edb7c0376d1d
Parents: 56d84a7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:25 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:25 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/net/MessagingService.java | 13 +++++-
.../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
.../apache/cassandra/service/RemoveTest.java | 1 +
4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d999e88..9003309 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
2.0.7
* Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
* (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
* Fix NPE in MeteredFlusher (CASSANDRA-6820)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 094e861..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,6 +392,7 @@ public final class MessagingService implements MessagingServiceMBean
*/
public void listen(InetAddress localEp) throws ConfigurationException
{
+ callbacks.reset(); // hack to allow tests to stop/restart MS
for (ServerSocket ss : getServerSockets(localEp))
{
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -535,7 +536,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
int messageId = nextId();
- ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -544,7 +545,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
int messageId = nextId();
- ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -653,6 +654,11 @@ public final class MessagingService implements MessagingServiceMBean
subscribers.add(subcriber);
}
+ public void clearCallbacksUnsafe()
+ {
+ callbacks.reset();
+ }
+
/**
* Wait for callbacks and don't allow any more to be created (since they could require writing hints)
*/
@@ -662,6 +668,9 @@ public final class MessagingService implements MessagingServiceMBean
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
+ // the important part
+ callbacks.shutdownBlocking();
+
// attempt to humor tests that try to stop and restart MS
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index cbec808..7eec40e 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
public class ExpiringMap<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+ private volatile boolean shutdown;
public static class CacheableObject<T>
{
@@ -55,17 +55,12 @@ public class ExpiringMap<K, V>
{
return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
}
-
- public String toString()
- {
- return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
- }
}
// if we use more ExpiringMaps we may want to add multiple threads to this executor
private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
- private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
private final long defaultExpiration;
public ExpiringMap(long defaultExpiration)
@@ -86,7 +81,7 @@ public class ExpiringMap<K, V>
throw new IllegalArgumentException("Argument specified must be a positive number");
}
- Runnable reaperTask = new Runnable()
+ Runnable runnable = new Runnable()
{
public void run()
{
@@ -105,17 +100,45 @@ public class ExpiringMap<K, V>
logger.trace("Expired {} entries", n);
}
};
- service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+ service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
}
- public CacheableObject<V> put(K key, V value)
+ public void shutdownBlocking()
+ {
+ service.shutdown();
+ try
+ {
+ service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public void reset()
+ {
+ shutdown = false;
+ cache.clear();
+ }
+
+ public V put(K key, V value)
{
return put(key, value, this.defaultExpiration);
}
- public CacheableObject<V> put(K key, V value, long timeout)
+ public V put(K key, V value, long timeout)
{
- return cache.put(key, new CacheableObject<V>(value, timeout));
+ if (shutdown)
+ {
+ // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+ // So we'll just sit on this thread until the rest of the server shutdown completes.
+ //
+ // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+ Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+ CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
+ return (previous == null) ? null : previous.value;
}
public V get(K key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 82e5e9e..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,6 +92,7 @@ public class RemoveTest
public void tearDown()
{
SinkManager.clear();
+ MessagingService.instance().clearCallbacksUnsafe();
MessagingService.instance().shutdown();
}
[7/8] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by br...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6fe526a5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6fe526a5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6fe526a5
Branch: refs/heads/trunk
Commit: 6fe526a56eeaa340ea2b319a54cb4fca21440bf2
Parents: c732447 c6dbba1
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:52 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:52 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/db/filter/ColumnSlice.java | 9 ++--
.../apache/cassandra/net/MessagingService.java | 13 +++++-
.../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
.../cassandra/db/filter/ColumnSliceTest.java | 4 ++
.../apache/cassandra/service/RemoveTest.java | 1 +
6 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fe526a5/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fe526a5/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
[5/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6dbba1b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6dbba1b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6dbba1b
Branch: refs/heads/trunk
Commit: c6dbba1bbd2cf52bf3173c920e4526e9365a9d42
Parents: e34d1af 4f47a44
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:42 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:42 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/net/MessagingService.java | 13 +++++-
.../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
.../apache/cassandra/service/RemoveTest.java | 1 +
4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d5c8372,9003309..7da222e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,46 -1,5 +1,45 @@@
-2.0.7
+2.1.0-beta2
+ * Fail write instead of logging a warning when unable to append to CL
+ (CASSANDRA-6764)
+ * Eliminate possibility of CL segment appearing twice in active list
+ (CASSANDRA-6557)
+ * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
+ * Switch CRC component to Adler and include it for compressed sstables
+ (CASSANDRA-4165)
+ * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
+ * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
+ * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
+ * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
+ * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
+ * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
+ * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
+ * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
+ * Scrub should not always clear out repaired status (CASSANDRA-5351)
+ * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
+ * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
+ * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
+ * Change caching option syntax (CASSANDRA-6745)
+ * Fix stress to do proper counter reads (CASSANDRA-6835)
+ * Fix help message for stress counter_write (CASSANDRA-6824)
+ * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
+ * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
+ * Fix race condition in Batch CLE (CASSANDRA-6860)
+ * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
+ * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
+ * Proper compare function for CollectionType (CASSANDRA-6783)
+ * Update native server to Netty 4 (CASSANDRA-6236)
+ * Fix off-by-one error in stress (CASSANDRA-6883)
+ * Make OpOrder AutoCloseable (CASSANDRA-6901)
+ * Remove sync repair JMX interface (CASSANDRA-6900)
+ * Add multiple memory allocation options for memtables (CASSANDRA-6689)
+ * Remove adjusted op rate from stress output (CASSANDRA-6921)
+ * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
+ * Serialize batchlog mutations with the version of the target node
+ (CASSANDRA-6931)
+ * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
+ * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
+Merged from 2.0:
* Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
* (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
* Fix NPE in MeteredFlusher (CASSANDRA-6820)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
[8/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/db/marshal/CompositeType.java
test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e34d1af9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e34d1af9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e34d1af9
Branch: refs/heads/trunk
Commit: e34d1af963b85c46afc973221a296498eaba8264
Parents: 5ab1a34 f4f7417
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Wed Apr 2 14:55:48 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Apr 2 14:55:48 2014 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/filter/ColumnSlice.java | 9 ++++++---
.../org/apache/cassandra/db/filter/ColumnSliceTest.java | 4 ++++
2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e34d1af9/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 3838ee5,9eff12a..227297e
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@@ -52,63 -47,20 +52,66 @@@ public class ColumnSlic
this.finish = finish;
}
- public boolean isAlwaysEmpty(AbstractType<?> comparator, boolean reversed)
+ public boolean isAlwaysEmpty(CellNameType comparator, boolean reversed)
{
- Comparator<ByteBuffer> orderedComparator = reversed ? comparator.reverseComparator : comparator;
- return (start.remaining() > 0 && finish.remaining() > 0 && orderedComparator.compare(start, finish) > 0);
+ Comparator<Composite> orderedComparator = reversed ? comparator.reverseComparator() : comparator;
+ return !start.isEmpty() && !finish.isEmpty() && orderedComparator.compare(start, finish) > 0;
}
- public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
+ public boolean includes(Comparator<Composite> cmp, Composite name)
{
- return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
+ return cmp.compare(start, name) <= 0 && (finish.isEmpty() || cmp.compare(finish, name) >= 0);
}
- public boolean isBefore(Comparator<ByteBuffer> cmp, ByteBuffer name)
+ public boolean isBefore(Comparator<Composite> cmp, Composite name)
{
- return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
+ return !finish.isEmpty() && cmp.compare(finish, name) < 0;
+ }
+
+ public boolean intersects(List<ByteBuffer> minCellNames, List<ByteBuffer> maxCellNames, CellNameType comparator, boolean reversed)
+ {
+ assert minCellNames.size() == maxCellNames.size();
+
+ Composite sStart = reversed ? finish : start;
+ Composite sEnd = reversed ? start : finish;
+
+ if (compare(sStart, maxCellNames, comparator, true) > 0 || compare(sEnd, minCellNames, comparator, false) < 0)
+ return false;
+
+ // We could safely return true here, but there's a minor optimization: if the first component is restricted
+ // to a single value, we can check that the second component falls within the min/max for that component
+ // (and repeat for all components).
- for (int i = 0; i < Math.min(Math.min(sStart.size(), sEnd.size()), minCellNames.size()); i++)
++ for (int i = 0; i < minCellNames.size(); i++)
+ {
+ AbstractType<?> t = comparator.subtype(i);
++ ByteBuffer s = i < sStart.size() ? sStart.get(i) : ByteBufferUtil.EMPTY_BYTE_BUFFER;
++ ByteBuffer f = i < sEnd.size() ? sEnd.get(i) : ByteBufferUtil.EMPTY_BYTE_BUFFER;
++
+ // we already know the first component falls within its min/max range (otherwise we wouldn't get here)
- if (i > 0 && (t.compare(sEnd.get(i), minCellNames.get(i)) < 0 || t.compare(sStart.get(i), maxCellNames.get(i)) > 0))
++ if (i > 0 && (t.compare(f, minCellNames.get(i)) < 0 || t.compare(s, maxCellNames.get(i)) > 0))
+ return false;
+
+ // if this component isn't equal in the start and finish, we don't need to check any more
- if (t.compare(sStart.get(i), sEnd.get(i)) != 0)
++ if (i >= sStart.size() || i >= sEnd.size() || t.compare(s, f) != 0)
+ break;
+ }
+
+ return true;
+ }
+
+ /** Helper method for intersects() */
+ private int compare(Composite sliceBounds, List<ByteBuffer> sstableBounds, CellNameType comparator, boolean isSliceStart)
+ {
+ for (int i = 0; i < sstableBounds.size(); i++)
+ {
+ if (i >= sliceBounds.size())
+ return isSliceStart ? -1 : 1;
+
+ int comparison = comparator.subtype(i).compare(sliceBounds.get(i), sstableBounds.get(i));
+ if (comparison != 0)
+ return comparison;
+ }
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e34d1af9/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
index 4718795,0000000..e2de0e6
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
@@@ -1,290 -1,0 +1,294 @@@
+/*
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ * */
+package org.apache.cassandra.db.filter;
+
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnSliceTest
+{
+ @Test
+ public void testIntersectsSingleSlice()
+ {
+ List<AbstractType<?>> types = new ArrayList<>();
+ types.add(Int32Type.instance);
+ types.add(Int32Type.instance);
+ types.add(Int32Type.instance);
+ CompoundDenseCellNameType nameType = new CompoundDenseCellNameType(types);
+
+ // filter falls entirely before sstable
+ ColumnSlice slice = new ColumnSlice(composite(0, 0, 0), composite(1, 0, 0));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with empty start
+ slice = new ColumnSlice(composite(), composite(1, 0, 0));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with missing components for start
+ slice = new ColumnSlice(composite(0), composite(1, 0, 0));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with missing components for start and end
+ slice = new ColumnSlice(composite(0), composite(1, 0));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+
+ // end of slice matches start of sstable for the first component, but not the second component
+ slice = new ColumnSlice(composite(0, 0, 0), composite(1, 0, 0));
+ assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with missing components for start
+ slice = new ColumnSlice(composite(0), composite(1, 0, 0));
+ assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with missing components for start and end
+ slice = new ColumnSlice(composite(0), composite(1, 0));
+ assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, false));
+
+ // first two components match, but not the last
+ slice = new ColumnSlice(composite(0, 0, 0), composite(1, 1, 0));
+ assertFalse(slice.intersects(columnNames(1, 1, 1), columnNames(3, 1, 1), nameType, false));
+
+ // all three components in slice end match the start of the sstable
+ slice = new ColumnSlice(composite(0, 0, 0), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 1, 1), columnNames(3, 1, 1), nameType, false));
+
+
+ // filter falls entirely after sstable
+ slice = new ColumnSlice(composite(4, 0, 0), composite(4, 0, 0));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with empty end
+ slice = new ColumnSlice(composite(4, 0, 0), composite());
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with missing components for end
+ slice = new ColumnSlice(composite(4, 0, 0), composite(1));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+ // same case, but with missing components for start and end
+ slice = new ColumnSlice(composite(4, 0), composite(1));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, false));
+
+
+ // start of slice matches end of sstable for the first component, but not the second component
+ slice = new ColumnSlice(composite(1, 1, 1), composite(2, 0, 0));
+ assertFalse(slice.intersects(columnNames(0, 0, 0), columnNames(1, 0, 0), nameType, false));
+
+ // start of slice matches end of sstable for the first two components, but not the last component
+ slice = new ColumnSlice(composite(1, 1, 1), composite(2, 0, 0));
+ assertFalse(slice.intersects(columnNames(0, 0, 0), columnNames(1, 1, 0), nameType, false));
+
+ // all three components in the slice start match the end of the sstable
+ slice = new ColumnSlice(composite(1, 1, 1), composite(2, 0, 0));
+ assertTrue(slice.intersects(columnNames(0, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+
+ // slice covers entire sstable (with no matching edges)
+ slice = new ColumnSlice(composite(0, 0, 0), composite(2, 0, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // same case, but with empty ends
+ slice = new ColumnSlice(composite(), composite());
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // same case, but with missing components
+ slice = new ColumnSlice(composite(0), composite(2, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // slice covers entire sstable (with matching start)
+ slice = new ColumnSlice(composite(1, 0, 0), composite(2, 0, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // slice covers entire sstable (with matching end)
+ slice = new ColumnSlice(composite(0, 0, 0), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // slice covers entire sstable (with matching start and end)
+ slice = new ColumnSlice(composite(1, 0, 0), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+
+ // slice falls entirely within sstable (with matching start)
+ slice = new ColumnSlice(composite(1, 0, 0), composite(1, 1, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // same case, but with a missing end component
+ slice = new ColumnSlice(composite(1, 0, 0), composite(1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // slice falls entirely within sstable (with matching end)
+ slice = new ColumnSlice(composite(1, 1, 0), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+ // same case, but with a missing start component
+ slice = new ColumnSlice(composite(1, 1), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(1, 1, 1), nameType, false));
+
+
+ // slice falls entirely within sstable
+ slice = new ColumnSlice(composite(1, 1, 0), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
+
+ // same case, but with a missing start component
+ slice = new ColumnSlice(composite(1, 1), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
+
+ // same case, but with a missing start and end components
+ slice = new ColumnSlice(composite(1), composite(1, 2));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
+
+ // slice falls entirely within sstable (slice start and end are the same)
+ slice = new ColumnSlice(composite(1, 1, 1), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, false));
+
+
+ // slice starts within sstable, empty end
+ slice = new ColumnSlice(composite(1, 1, 1), composite());
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // same case, but with missing end components
+ slice = new ColumnSlice(composite(1, 1, 1), composite(3));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // slice starts within sstable (matching sstable start), empty end
+ slice = new ColumnSlice(composite(1, 0, 0), composite());
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // same case, but with missing end components
+ slice = new ColumnSlice(composite(1, 0, 0), composite(3));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // slice starts within sstable (matching sstable end), empty end
+ slice = new ColumnSlice(composite(2, 0, 0), composite());
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // same case, but with missing end components
+ slice = new ColumnSlice(composite(2, 0, 0), composite(3));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+
+ // slice ends within sstable, empty end
+ slice = new ColumnSlice(composite(), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // same case, but with missing start components
+ slice = new ColumnSlice(composite(0), composite(1, 1, 1));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // slice ends within sstable (matching sstable start), empty start
+ slice = new ColumnSlice(composite(), composite(1, 0, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // same case, but with missing start components
+ slice = new ColumnSlice(composite(0), composite(1, 0, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // slice ends within sstable (matching sstable end), empty start
+ slice = new ColumnSlice(composite(), composite(2, 0, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+ // same case, but with missing start components
+ slice = new ColumnSlice(composite(0), composite(2, 0, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 0, 0), nameType, false));
+
+
+ // the slice technically falls within the sstable range, but since the first component is restricted to
+ // a single value, we can check that the second component does not fall within its min/max
+ slice = new ColumnSlice(composite(1, 2, 0), composite(1, 3, 0));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
+ // same case, but with a missing start component
+ slice = new ColumnSlice(composite(1, 2), composite(1, 3, 0));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
+ // same case, but with a missing end component
+ slice = new ColumnSlice(composite(1, 2, 0), composite(1, 3));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
+ // same case, but with a missing start and end components
+ slice = new ColumnSlice(composite(1, 2), composite(1, 3));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
++ // same case, but with missing start and end components and different lengths for start and end
++ slice = new ColumnSlice(composite(1, 2), composite(1));
++ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
++
+
+ // same as the previous set of tests, but the second component is equal in the slice start and end
+ slice = new ColumnSlice(composite(1, 2, 0), composite(1, 2, 0));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
+ // same case, but with a missing start component
+ slice = new ColumnSlice(composite(1, 2), composite(1, 2, 0));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
+ // same case, but with a missing end component
+ slice = new ColumnSlice(composite(1, 2, 0), composite(1, 2));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
+ // same case, but with a missing start and end components
+ slice = new ColumnSlice(composite(1, 2), composite(1, 2));
+ assertFalse(slice.intersects(columnNames(1, 0, 0), columnNames(2, 1, 0), nameType, false));
+
+ // same as the previous tests, but it's the third component that doesn't fit in its range this time
+ slice = new ColumnSlice(composite(1, 1, 2), composite(1, 1, 3));
+ assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(2, 2, 1), nameType, false));
+
+
+ // basic check on reversed slices
+ slice = new ColumnSlice(composite(1, 0, 0), composite(0, 0, 0));
+ assertFalse(slice.intersects(columnNames(2, 0, 0), columnNames(3, 0, 0), nameType, true));
+
+ slice = new ColumnSlice(composite(1, 0, 0), composite(0, 0, 0));
+ assertFalse(slice.intersects(columnNames(1, 1, 0), columnNames(3, 0, 0), nameType, true));
+
+ slice = new ColumnSlice(composite(1, 1, 1), composite(1, 1, 0));
+ assertTrue(slice.intersects(columnNames(1, 0, 0), columnNames(2, 2, 2), nameType, true));
+ }
+
+ private static Composite composite(Integer ... components)
+ {
+ List<AbstractType<?>> types = new ArrayList<>();
+ types.add(Int32Type.instance);
+ types.add(Int32Type.instance);
+ types.add(Int32Type.instance);
+ CompoundDenseCellNameType nameType = new CompoundDenseCellNameType(types);
+ return nameType.make(components);
+ }
+
+ private static List<ByteBuffer> columnNames(Integer ... components)
+ {
+ List<ByteBuffer> names = new ArrayList<>(components.length);
+ for (int component : components)
+ names.add(ByteBufferUtil.bytes(component));
+ return names;
+ }
+}
[6/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6dbba1b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6dbba1b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6dbba1b
Branch: refs/heads/cassandra-2.1
Commit: c6dbba1bbd2cf52bf3173c920e4526e9365a9d42
Parents: e34d1af 4f47a44
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:42 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:42 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/net/MessagingService.java | 13 +++++-
.../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
.../apache/cassandra/service/RemoveTest.java | 1 +
4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d5c8372,9003309..7da222e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,46 -1,5 +1,45 @@@
-2.0.7
+2.1.0-beta2
+ * Fail write instead of logging a warning when unable to append to CL
+ (CASSANDRA-6764)
+ * Eliminate possibility of CL segment appearing twice in active list
+ (CASSANDRA-6557)
+ * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
+ * Switch CRC component to Adler and include it for compressed sstables
+ (CASSANDRA-4165)
+ * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
+ * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
+ * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
+ * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
+ * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
+ * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
+ * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
+ * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
+ * Scrub should not always clear out repaired status (CASSANDRA-5351)
+ * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
+ * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
+ * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
+ * Change caching option syntax (CASSANDRA-6745)
+ * Fix stress to do proper counter reads (CASSANDRA-6835)
+ * Fix help message for stress counter_write (CASSANDRA-6824)
+ * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
+ * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
+ * Fix race condition in Batch CLE (CASSANDRA-6860)
+ * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
+ * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
+ * Proper compare function for CollectionType (CASSANDRA-6783)
+ * Update native server to Netty 4 (CASSANDRA-6236)
+ * Fix off-by-one error in stress (CASSANDRA-6883)
+ * Make OpOrder AutoCloseable (CASSANDRA-6901)
+ * Remove sync repair JMX interface (CASSANDRA-6900)
+ * Add multiple memory allocation options for memtables (CASSANDRA-6689)
+ * Remove adjusted op rate from stress output (CASSANDRA-6921)
+ * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
+ * Serialize batchlog mutations with the version of the target node
+ (CASSANDRA-6931)
+ * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
+ * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
+Merged from 2.0:
* Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
* (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
* Fix NPE in MeteredFlusher (CASSANDRA-6820)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6dbba1b/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
[2/8] git commit: Revert "Don't shut ExpiringMap down."
Posted by br...@apache.org.
Revert "Don't shut ExpiringMap down."
This reverts commit d4ec31f21eb83502bd523c057bce2d3b249b3d80.
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f47a44d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f47a44d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f47a44d
Branch: refs/heads/cassandra-2.1
Commit: 4f47a44d05a1c9b8e284c2d8c084edb7c0376d1d
Parents: 56d84a7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:25 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:25 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/net/MessagingService.java | 13 +++++-
.../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
.../apache/cassandra/service/RemoveTest.java | 1 +
4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d999e88..9003309 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
2.0.7
* Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
* (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
* Fix NPE in MeteredFlusher (CASSANDRA-6820)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 094e861..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,6 +392,7 @@ public final class MessagingService implements MessagingServiceMBean
*/
public void listen(InetAddress localEp) throws ConfigurationException
{
+ callbacks.reset(); // hack to allow tests to stop/restart MS
for (ServerSocket ss : getServerSockets(localEp))
{
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -535,7 +536,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
int messageId = nextId();
- ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -544,7 +545,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
int messageId = nextId();
- ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -653,6 +654,11 @@ public final class MessagingService implements MessagingServiceMBean
subscribers.add(subcriber);
}
+ public void clearCallbacksUnsafe()
+ {
+ callbacks.reset();
+ }
+
/**
* Wait for callbacks and don't allow any more to be created (since they could require writing hints)
*/
@@ -662,6 +668,9 @@ public final class MessagingService implements MessagingServiceMBean
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
+ // the important part
+ callbacks.shutdownBlocking();
+
// attempt to humor tests that try to stop and restart MS
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index cbec808..7eec40e 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
public class ExpiringMap<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+ private volatile boolean shutdown;
public static class CacheableObject<T>
{
@@ -55,17 +55,12 @@ public class ExpiringMap<K, V>
{
return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
}
-
- public String toString()
- {
- return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
- }
}
// if we use more ExpiringMaps we may want to add multiple threads to this executor
private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
- private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
private final long defaultExpiration;
public ExpiringMap(long defaultExpiration)
@@ -86,7 +81,7 @@ public class ExpiringMap<K, V>
throw new IllegalArgumentException("Argument specified must be a positive number");
}
- Runnable reaperTask = new Runnable()
+ Runnable runnable = new Runnable()
{
public void run()
{
@@ -105,17 +100,45 @@ public class ExpiringMap<K, V>
logger.trace("Expired {} entries", n);
}
};
- service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+ service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
}
- public CacheableObject<V> put(K key, V value)
+ public void shutdownBlocking()
+ {
+ service.shutdown();
+ try
+ {
+ service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public void reset()
+ {
+ shutdown = false;
+ cache.clear();
+ }
+
+ public V put(K key, V value)
{
return put(key, value, this.defaultExpiration);
}
- public CacheableObject<V> put(K key, V value, long timeout)
+ public V put(K key, V value, long timeout)
{
- return cache.put(key, new CacheableObject<V>(value, timeout));
+ if (shutdown)
+ {
+ // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+ // So we'll just sit on this thread until the rest of the server shutdown completes.
+ //
+ // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+ Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+ CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
+ return (previous == null) ? null : previous.value;
}
public V get(K key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 82e5e9e..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,6 +92,7 @@ public class RemoveTest
public void tearDown()
{
SinkManager.clear();
+ MessagingService.instance().clearCallbacksUnsafe();
MessagingService.instance().shutdown();
}
[3/8] git commit: Revert "Don't shut ExpiringMap down."
Posted by br...@apache.org.
Revert "Don't shut ExpiringMap down."
This reverts commit d4ec31f21eb83502bd523c057bce2d3b249b3d80.
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f47a44d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f47a44d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f47a44d
Branch: refs/heads/trunk
Commit: 4f47a44d05a1c9b8e284c2d8c084edb7c0376d1d
Parents: 56d84a7
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:53:25 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:53:25 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/net/MessagingService.java | 13 +++++-
.../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++++++++++++-----
.../apache/cassandra/service/RemoveTest.java | 1 +
4 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d999e88..9003309 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
2.0.7
* Allow compaction of system tables during startup (CASSANDRA-6913)
- * Don't shut ExpiringMap down (CASSANDRA-6948)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
* (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
* Fix NPE in MeteredFlusher (CASSANDRA-6820)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 094e861..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,6 +392,7 @@ public final class MessagingService implements MessagingServiceMBean
*/
public void listen(InetAddress localEp) throws ConfigurationException
{
+ callbacks.reset(); // hack to allow tests to stop/restart MS
for (ServerSocket ss : getServerSockets(localEp))
{
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -535,7 +536,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
int messageId = nextId();
- ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -544,7 +545,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
int messageId = nextId();
- ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -653,6 +654,11 @@ public final class MessagingService implements MessagingServiceMBean
subscribers.add(subcriber);
}
+ public void clearCallbacksUnsafe()
+ {
+ callbacks.reset();
+ }
+
/**
* Wait for callbacks and don't allow any more to be created (since they could require writing hints)
*/
@@ -662,6 +668,9 @@ public final class MessagingService implements MessagingServiceMBean
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
+ // the important part
+ callbacks.shutdownBlocking();
+
// attempt to humor tests that try to stop and restart MS
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index cbec808..7eec40e 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
public class ExpiringMap<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+ private volatile boolean shutdown;
public static class CacheableObject<T>
{
@@ -55,17 +55,12 @@ public class ExpiringMap<K, V>
{
return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
}
-
- public String toString()
- {
- return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
- }
}
// if we use more ExpiringMaps we may want to add multiple threads to this executor
private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
- private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
private final long defaultExpiration;
public ExpiringMap(long defaultExpiration)
@@ -86,7 +81,7 @@ public class ExpiringMap<K, V>
throw new IllegalArgumentException("Argument specified must be a positive number");
}
- Runnable reaperTask = new Runnable()
+ Runnable runnable = new Runnable()
{
public void run()
{
@@ -105,17 +100,45 @@ public class ExpiringMap<K, V>
logger.trace("Expired {} entries", n);
}
};
- service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+ service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
}
- public CacheableObject<V> put(K key, V value)
+ public void shutdownBlocking()
+ {
+ service.shutdown();
+ try
+ {
+ service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public void reset()
+ {
+ shutdown = false;
+ cache.clear();
+ }
+
+ public V put(K key, V value)
{
return put(key, value, this.defaultExpiration);
}
- public CacheableObject<V> put(K key, V value, long timeout)
+ public V put(K key, V value, long timeout)
{
- return cache.put(key, new CacheableObject<V>(value, timeout));
+ if (shutdown)
+ {
+ // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
+ // So we'll just sit on this thread until the rest of the server shutdown completes.
+ //
+ // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
+ Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+ CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
+ return (previous == null) ? null : previous.value;
}
public V get(K key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f47a44d/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 82e5e9e..62dd636 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,6 +92,7 @@ public class RemoveTest
public void tearDown()
{
SinkManager.clear();
+ MessagingService.instance().clearCallbacksUnsafe();
MessagingService.instance().shutdown();
}