You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/06 05:15:40 UTC
svn commit: r907172 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/
src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/streaming/
src/java/org/apache/cassandra/thrift/ test/unit/or...
Author: jbellis
Date: Sat Feb 6 04:15:39 2010
New Revision: 907172
URL: http://svn.apache.org/viewvc?rev=907172&view=rev
Log:
have RangeSliceCommand take Range or Bounds (client bounds, start-inclusive, non-wrapping)
patch by jbellis; reviewed by Stu Hood for CASSANDRA-763
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java (with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java (with props)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Feb 6 04:15:39 2010
@@ -39,6 +39,9 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
import org.apache.cassandra.io.util.FileUtils;
@@ -930,24 +933,22 @@
}
/**
- * @param startWith key to start with, inclusive. empty string = start at beginning.
- * @param stopAt key to stop at, inclusive. empty string = stop only when keys are exhausted.
+ * @param range: either a Bounds, which includes start key, or a Range, which does not.
* @param maxResults
- * @param includeStartKey
* @return list of keys between startWith and stopAt
TODO refactor better. this is just getKeyRange w/o the deletion check, for the benefit of
range_slice. still opens one randomaccessfile per key, which sucks. something like compactioniterator
would be better.
*/
- private boolean getKeyRange(List<String> keys, final DecoratedKey startWith, final DecoratedKey stopAt, int maxResults, boolean includeStartKey)
+ private boolean getKeyRange(List<String> keys, final AbstractBounds range, int maxResults)
throws IOException, ExecutionException, InterruptedException
{
- // getKeyRange requires start <= stop. getRangeSlice handles range wrapping if necessary.
- assert stopAt.isEmpty() || startWith.compareTo(stopAt) <= 0;
+ final DecoratedKey startWith = new DecoratedKey(range.left, null);
+ final DecoratedKey stopAt = new DecoratedKey(range.right, null);
// create a CollatedIterator that will return unique keys from different sources
// (current memtable, historical memtables, and SSTables) in the correct order.
- List<Iterator<DecoratedKey>> iterators = new ArrayList<Iterator<DecoratedKey>>();
+ final List<Iterator<DecoratedKey>> iterators = new ArrayList<Iterator<DecoratedKey>>();
// we iterate through memtables with a priority queue to avoid more sorting than necessary.
// this predicate throws out the keys before the start of our range.
@@ -1022,7 +1023,7 @@
return true;
}
- if (includeStartKey || !first || !current.equals(startWith))
+ if (range instanceof Bounds || !first || !current.equals(startWith))
{
keys.add(current.key);
}
@@ -1050,33 +1051,32 @@
/**
*
* @param super_column
- * @param startKey key to start at (inclusive). empty string = start at the beginning.
- * @param finishKey key to stop at (inclusive). empty string = stop at the end.
+ * @param range: either a Bounds, which includes start key, or a Range, which does not.
* @param keyMax maximum number of keys to process, regardless of startKey/finishKey
* @param sliceRange may be null if columnNames is specified. specifies contiguous columns to return in what order.
* @param columnNames may be null if sliceRange is specified. specifies which columns to return in what order. @return list of key->list<column> tuples.
- * @param includeStartKey
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
- public RangeSliceReply getRangeSlice(byte[] super_column, final DecoratedKey startKey, final DecoratedKey finishKey, int keyMax, SliceRange sliceRange, List<byte[]> columnNames, boolean includeStartKey)
+ public RangeSliceReply getRangeSlice(byte[] super_column, final AbstractBounds range, int keyMax, SliceRange sliceRange, List<byte[]> columnNames)
throws IOException, ExecutionException, InterruptedException
{
List<String> keys = new ArrayList<String>();
boolean completed;
- if (finishKey.isEmpty() || startKey.compareTo(finishKey) <= 0)
+ if ((range instanceof Bounds || !((Range)range).isWrapAround()))
{
- completed = getKeyRange(keys, startKey, finishKey, keyMax, includeStartKey);
+ completed = getKeyRange(keys, range, keyMax);
}
else
{
// wrapped range
- DecoratedKey emptyKey = new DecoratedKey(StorageService.getPartitioner().getMinimumToken(), null);
- completed = getKeyRange(keys, startKey, emptyKey, keyMax, includeStartKey);
+ Range first = new Range(range.left, StorageService.getPartitioner().getMinimumToken());
+ completed = getKeyRange(keys, first, keyMax);
if (!completed)
{
- completed = getKeyRange(keys, emptyKey, finishKey, keyMax, true);
+ Range second = new Range(StorageService.getPartitioner().getMinimumToken(), range.right);
+ completed = getKeyRange(keys, second, keyMax);
}
}
List<Row> rows = new ArrayList<Row>(keys.size());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Sat Feb 6 04:15:39 2010
@@ -38,6 +38,8 @@
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
@@ -66,26 +68,22 @@
public final SlicePredicate predicate;
- public final DecoratedKey startKey;
- public final DecoratedKey finishKey;
+ public final AbstractBounds range;
public final int max_keys;
- public final boolean includeStartKey;
- public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys)
+ public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds range, int max_keys)
{
- this(keyspace, column_parent.getColumn_family(), column_parent.getSuper_column(), predicate, startKey, finishKey, max_keys, true);
+ this(keyspace, column_parent.getColumn_family(), column_parent.getSuper_column(), predicate, range, max_keys);
}
- public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys, boolean includeStartKey)
+ public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, AbstractBounds range, int max_keys)
{
this.keyspace = keyspace;
this.column_family = column_family;
this.super_column = super_column;
this.predicate = predicate;
- this.startKey = startKey;
- this.finishKey = finishKey;
+ this.range = range;
this.max_keys = max_keys;
- this.includeStartKey = includeStartKey;
}
public Message getMessage() throws IOException
@@ -98,6 +96,19 @@
Arrays.copyOf(dob.getData(), dob.getLength()));
}
+ @Override
+ public String toString()
+ {
+ return "RangeSliceCommand{" +
+ "keyspace='" + keyspace + '\'' +
+ ", column_family='" + column_family + '\'' +
+ ", super_column=" + super_column +
+ ", predicate=" + predicate +
+ ", range=" + range +
+ ", max_keys=" + max_keys +
+ '}';
+ }
+
public static RangeSliceCommand read(Message message) throws IOException
{
byte[] bytes = message.getMessageBody();
@@ -118,10 +129,8 @@
TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
FBUtilities.serialize(ser, sliceCommand.predicate, dos);
- DecoratedKey.serializer().serialize(sliceCommand.startKey, dos);
- DecoratedKey.serializer().serialize(sliceCommand.finishKey, dos);
+ Bounds.serializer().serialize(sliceCommand.range, dos);
dos.writeInt(sliceCommand.max_keys);
- dos.writeBoolean(sliceCommand.includeStartKey);
}
public RangeSliceCommand deserialize(DataInputStream dis) throws IOException
@@ -138,11 +147,9 @@
SlicePredicate pred = new SlicePredicate();
FBUtilities.deserialize(dser, pred, dis);
- DecoratedKey startKey = DecoratedKey.serializer().deserialize(dis);
- DecoratedKey finishKey = DecoratedKey.serializer().deserialize(dis);
+ AbstractBounds range = AbstractBounds.serializer().deserialize(dis);
int max_keys = dis.readInt();
- boolean includeStartKey = dis.readBoolean();
- return new RangeSliceCommand(keyspace, column_family, super_column, pred, startKey, finishKey, max_keys, includeStartKey);
+ return new RangeSliceCommand(keyspace, column_family, super_column, pred, range, max_keys);
}
static byte[] readBuf(int len, DataInputStream dis) throws IOException
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java?rev=907172&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java Sat Feb 6 04:15:39 2010
@@ -0,0 +1,51 @@
+package org.apache.cassandra.dht;
+
+import java.io.*;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer2;
+
+public abstract class AbstractBounds
+{
+ private static BoundsSerializer serializer_ = new BoundsSerializer();
+
+ private enum Type
+ {
+ RANGE,
+ BOUNDS
+ }
+
+ public static ICompactSerializer2<AbstractBounds> serializer()
+ {
+ return serializer_;
+ }
+
+ public final Token left;
+ public final Token right;
+
+ public AbstractBounds(Token left, Token right)
+ {
+ this.left = left;
+ this.right = right;
+ }
+
+ public abstract List<AbstractBounds> restrictTo(Range range);
+
+ private static class BoundsSerializer implements ICompactSerializer2<AbstractBounds>
+ {
+ public void serialize(AbstractBounds range, DataOutput out) throws IOException
+ {
+ out.writeInt(range instanceof Range ? Type.RANGE.ordinal() : Type.BOUNDS.ordinal());
+ Token.serializer().serialize(range.left, out);
+ Token.serializer().serialize(range.right, out);
+ }
+
+ public AbstractBounds deserialize(DataInput in) throws IOException
+ {
+ if (in.readInt() == Type.RANGE.ordinal())
+ return new Range(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+ return new Bounds(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+ }
+ }
+}
+
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java?rev=907172&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java Sat Feb 6 04:15:39 2010
@@ -0,0 +1,41 @@
+package org.apache.cassandra.dht;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.ObjectUtils;
+
+import org.apache.cassandra.service.StorageService;
+
+public class Bounds extends AbstractBounds
+{
+ public Bounds(Token left, Token right)
+ {
+ super(left, right);
+ // unlike a Range, a Bounds may not wrap
+ assert left.compareTo(right) <= 0 || right.equals(StorageService.getPartitioner().getMinimumToken());
+ }
+
+ public List<AbstractBounds> restrictTo(Range range)
+ {
+ Token left, right;
+ if (range.left.equals(range.right))
+ {
+ left = this.left;
+ right = this.right;
+ }
+ else
+ {
+ left = (Token) ObjectUtils.max(this.left, range.left);
+ right = this.right.equals(StorageService.getPartitioner().getMinimumToken())
+ ? range.right
+ : (Token) ObjectUtils.min(this.right, range.right);
+ }
+ return (List) Arrays.asList(new Bounds(left, right));
+ }
+
+ public String toString()
+ {
+ return "[" + left + "," + right + "]";
+ }
+}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Sat Feb 6 04:15:39 2010
@@ -18,9 +18,6 @@
package org.apache.cassandra.dht;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,37 +26,19 @@
import org.apache.commons.lang.ObjectUtils;
-import org.apache.cassandra.io.ICompactSerializer;
-
/**
* A representation of the range that a node is responsible for on the DHT ring.
*
* A Range is responsible for the tokens between (left, right].
*/
-public class Range implements Comparable<Range>, Serializable
+public class Range extends AbstractBounds implements Comparable<Range>, Serializable
{
public static final long serialVersionUID = 1L;
- private static ICompactSerializer<Range> serializer_;
-
- static
- {
- serializer_ = new RangeSerializer();
- }
-
- public static ICompactSerializer<Range> serializer()
- {
- return serializer_;
- }
-
- public final Token left;
- public final Token right;
-
public Range(Token left, Token right)
{
- this.left = left;
- this.right = right;
+ super(left, right);
}
public static boolean contains(Token left, Token right, Token bi)
@@ -146,6 +125,11 @@
return intersectionOneWrapping(that, this);
}
+ public List<AbstractBounds> restrictTo(Range range)
+ {
+ return (List) intersectionWith(range);
+ }
+
private static List<Range> intersectionOneWrapping(Range wrapping, Range other)
{
List<Range> intersection = new ArrayList<Range>(2);
@@ -211,23 +195,14 @@
{
return toString().hashCode();
}
-
+
public String toString()
{
return "(" + left + "," + right + "]";
}
-}
-
-class RangeSerializer implements ICompactSerializer<Range>
-{
- public void serialize(Range range, DataOutputStream dos) throws IOException
- {
- Token.serializer().serialize(range.left, dos);
- Token.serializer().serialize(range.right, dos);
- }
- public Range deserialize(DataInputStream dis) throws IOException
+ public boolean isWrapAround()
{
- return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
+ return isWrapAround(left, right);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Sat Feb 6 04:15:39 2010
@@ -39,12 +39,10 @@
RangeSliceCommand command = RangeSliceCommand.read(message);
ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
RangeSliceReply reply = cfs.getRangeSlice(command.super_column,
- command.startKey,
- command.finishKey,
+ command.range,
command.max_keys,
command.predicate.slice_range,
- command.predicate.column_names,
- command.includeStartKey);
+ command.predicate.column_names);
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Sat Feb 6 04:15:39 2010
@@ -27,12 +27,13 @@
import java.util.concurrent.Future;
import java.lang.management.ManagementFactory;
-import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import java.net.InetAddress;
+
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -43,9 +44,6 @@
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.log4j.Logger;
@@ -538,12 +536,15 @@
long startTime = System.nanoTime();
TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
- InetAddress endPoint = StorageService.instance.getPrimary(command.startKey.token);
+ InetAddress endPoint = StorageService.instance.getPrimary(command.range.left);
InetAddress startEndpoint = endPoint;
final String table = command.keyspace;
int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), DatabaseDescriptor.getReplicationFactor(table), consistency_level);
+ // starting with the node that is primary for the start key, scan until either we have enough results,
+ // or the node scan reports that it was done (i.e., encountered a key outside the desired range).
Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
+ outer:
do
{
Range primaryRange = StorageService.instance.getPrimaryRangeForEndPoint(endPoint);
@@ -551,45 +552,35 @@
if (endpoints.size() < responseCount)
throw new UnavailableException();
- // to make comparing the results from each node easy, we restrict each command to the data in the primary range for this iteration
- DecoratedKey<?> startKey;
- DecoratedKey<?> finishKey;
- if (primaryRange.left.equals(primaryRange.right))
- {
- startKey = command.startKey;
- finishKey = command.finishKey;
- }
- else
- {
- startKey = (DecoratedKey<?>) ObjectUtils.max(command.startKey, new DecoratedKey<Token<?>>(primaryRange.left, null));
- finishKey = command.finishKey.isEmpty()
- ? new DecoratedKey<Token<?>>(primaryRange.right, null)
- : (DecoratedKey<?>) ObjectUtils.min(command.finishKey, new DecoratedKey<Token<?>>(primaryRange.right, null));
- }
- RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, startKey, finishKey, command.max_keys, command.includeStartKey);
- Message message = c2.getMessage();
-
- // collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
- QuorumResponseHandler<Map<String, ColumnFamily>> handler = new QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
- if (logger.isDebugEnabled())
- logger.debug("reading " + command + " for " + primaryRange + " from " + message.getMessageId() + "@" + endPoint);
- for (InetAddress replicaEndpoint : endpoints)
- {
- MessagingService.instance.sendRR(message, replicaEndpoint, handler);
- }
+ // to make comparing the results from each node easy, we restrict each scan the primary range for the node in question
+ List<AbstractBounds> restricted = command.range.restrictTo(primaryRange);
+ for (AbstractBounds range : restricted)
+ {
+ RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
+ Message message = c2.getMessage();
+
+ // collect replies and resolve according to consistency level
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
+ QuorumResponseHandler<Map<String, ColumnFamily>> handler = new QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
+ if (logger.isDebugEnabled())
+ logger.debug("reading " + c2 + " for " + range + " from " + message.getMessageId() + "@" + endPoint);
+ for (InetAddress replicaEndpoint : endpoints)
+ {
+ MessagingService.instance.sendRR(message, replicaEndpoint, handler);
+ }
- // if we're done, great, otherwise, move to the next range
- try
- {
- rows.putAll(handler.get());
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // no digests in range slices yet
+ // if we're done, great, otherwise, move to the next range
+ try
+ {
+ rows.putAll(handler.get());
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // no digests in range slices yet
+ }
+ if (rows.size() >= command.max_keys || resolver.completed())
+ break outer;
}
- if (rows.size() >= command.max_keys || resolver.completed())
- break;
endPoint = tokenMetadata.getSuccessor(endPoint);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java Sat Feb 6 04:15:39 2010
@@ -8,6 +8,7 @@
import java.util.Collection;
import java.util.List;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
@@ -67,7 +68,7 @@
dos.writeInt(srMetadata.ranges_.size());
for (Range range : srMetadata.ranges_)
{
- Range.serializer().serialize(range, dos);
+ AbstractBounds.serializer().serialize(range, dos);
}
}
@@ -79,7 +80,7 @@
List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
for( int i = 0; i < size; ++i )
{
- ranges.add(Range.serializer().deserialize(dis));
+ ranges.add((Range) AbstractBounds.serializer().deserialize(dis));
}
return new StreamRequestMetadata(target, ranges, table);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Sat Feb 6 04:15:39 2010
@@ -37,6 +37,7 @@
import static org.apache.cassandra.thrift.ThriftGlue.*;
+import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.Cassandra.Iface;
@@ -555,9 +556,9 @@
List<Pair<String, ColumnFamily>> rows;
try
{
- DecoratedKey startKey = StorageService.getPartitioner().decorateKey(start_key);
- DecoratedKey finishKey = StorageService.getPartitioner().decorateKey(finish_key);
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, startKey, finishKey, maxRows), consistency_level);
+ Bounds bounds = new Bounds(StorageService.getPartitioner().decorateKey(start_key).token,
+ StorageService.getPartitioner().decorateKey(finish_key).token);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, maxRows), consistency_level);
assert rows != null;
}
catch (TimeoutException e)
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java Sat Feb 6 04:15:39 2010
@@ -31,6 +31,9 @@
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.SliceRange;
@@ -63,14 +66,12 @@
public static RangeSliceReply getRangeSlice(ColumnFamilyStore cfs) throws IOException, ExecutionException, InterruptedException
{
- DecoratedKey emptyKey = StorageService.getPartitioner().decorateKey("");
+ Token min = StorageService.getPartitioner().getMinimumToken();
return cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY,
- emptyKey,
- emptyKey,
+ new Bounds(min, min),
10000,
new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, false, 10000),
- null,
- true);
+ null);
}
/**
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Sat Feb 6 04:15:39 2010
@@ -137,12 +137,10 @@
IPartitioner p = StorageService.getPartitioner();
RangeSliceReply result = cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY,
- p.decorateKey("key2"),
- p.decorateKey("key1"),
+ new Range(p.getToken("key15"), p.getToken("key1")),
10,
null,
- Arrays.asList("asdf".getBytes()),
- true);
+ Arrays.asList("asdf".getBytes()));
assertEquals(2, result.rows.size());
}
@@ -153,12 +151,10 @@
IPartitioner p = StorageService.getPartitioner();
RangeSliceReply result = cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY,
- p.decorateKey("key1"),
- p.decorateKey("key2"),
+ new Range(p.getToken("key1"), p.getToken("key2")),
10,
null,
- Arrays.asList("asdf".getBytes()),
- false);
+ Arrays.asList("asdf".getBytes()));
assertEquals(1, result.rows.size());
assert result.rows.get(0).key.equals("key2");
}
@@ -175,7 +171,6 @@
rm = new RowMutation("Keyspace2", "key2");
rm.add(new QueryPath("Standard1", null, "Column1".getBytes()), "asdf".getBytes(), 0);
rms.add(rm);
- ColumnFamilyStore cfs = Util.writeColumnFamily(rms);
- return cfs;
+ return Util.writeColumnFamily(rms);
}
}