You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/06 05:15:40 UTC

svn commit: r907172 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ src/java/org/apache/cassandra/thrift/ test/unit/or...

Author: jbellis
Date: Sat Feb  6 04:15:39 2010
New Revision: 907172

URL: http://svn.apache.org/viewvc?rev=907172&view=rev
Log:
have RangeSliceCommand take Range or Bounds (client bounds, start-inclusive, non-wrapping)
patch by jbellis; reviewed by Stu Hood for CASSANDRA-763

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java   (with props)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java   (with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Feb  6 04:15:39 2010
@@ -39,6 +39,9 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.util.FileUtils;
 
@@ -930,24 +933,22 @@
     }
 
     /**
-     * @param startWith key to start with, inclusive.  empty string = start at beginning.
-     * @param stopAt key to stop at, inclusive.  empty string = stop only when keys are exhausted.
+     * @param range: either a Bounds, which includes start key, or a Range, which does not.
      * @param maxResults
-     * @param includeStartKey
      * @return list of keys between startWith and stopAt
 
        TODO refactor better.  this is just getKeyRange w/o the deletion check, for the benefit of
        range_slice.  still opens one randomaccessfile per key, which sucks.  something like compactioniterator
        would be better.
      */
-    private boolean getKeyRange(List<String> keys, final DecoratedKey startWith, final DecoratedKey stopAt, int maxResults, boolean includeStartKey)
+    private boolean getKeyRange(List<String> keys, final AbstractBounds range, int maxResults)
     throws IOException, ExecutionException, InterruptedException
     {
-        // getKeyRange requires start <= stop.  getRangeSlice handles range wrapping if necessary.
-        assert stopAt.isEmpty() || startWith.compareTo(stopAt) <= 0;
+        final DecoratedKey startWith = new DecoratedKey(range.left, null);
+        final DecoratedKey stopAt = new DecoratedKey(range.right, null);
         // create a CollatedIterator that will return unique keys from different sources
         // (current memtable, historical memtables, and SSTables) in the correct order.
-        List<Iterator<DecoratedKey>> iterators = new ArrayList<Iterator<DecoratedKey>>();
+        final List<Iterator<DecoratedKey>> iterators = new ArrayList<Iterator<DecoratedKey>>();
 
         // we iterate through memtables with a priority queue to avoid more sorting than necessary.
         // this predicate throws out the keys before the start of our range.
@@ -1022,7 +1023,7 @@
                     return true;
                 }
 
-                if (includeStartKey || !first || !current.equals(startWith))
+                if (range instanceof Bounds || !first || !current.equals(startWith))
                 {
                     keys.add(current.key);
                 }
@@ -1050,33 +1051,32 @@
     /**
      *
      * @param super_column
-     * @param startKey key to start at (inclusive). empty string = start at the beginning.
-     * @param finishKey key to stop at (inclusive). empty string = stop at the end.
+     * @param range: either a Bounds, which includes start key, or a Range, which does not.
      * @param keyMax maximum number of keys to process, regardless of startKey/finishKey
      * @param sliceRange may be null if columnNames is specified. specifies contiguous columns to return in what order.
      * @param columnNames may be null if sliceRange is specified. specifies which columns to return in what order.      @return list of key->list<column> tuples.
-     * @param includeStartKey
      * @throws IOException
      * @throws ExecutionException
      * @throws InterruptedException
      */
-    public RangeSliceReply getRangeSlice(byte[] super_column, final DecoratedKey startKey, final DecoratedKey finishKey, int keyMax, SliceRange sliceRange, List<byte[]> columnNames, boolean includeStartKey)
+    public RangeSliceReply getRangeSlice(byte[] super_column, final AbstractBounds range, int keyMax, SliceRange sliceRange, List<byte[]> columnNames)
     throws IOException, ExecutionException, InterruptedException
     {
         List<String> keys = new ArrayList<String>();
         boolean completed;
-        if (finishKey.isEmpty() || startKey.compareTo(finishKey) <= 0)
+        if ((range instanceof Bounds || !((Range)range).isWrapAround()))
         {
-            completed = getKeyRange(keys, startKey, finishKey, keyMax, includeStartKey);
+            completed = getKeyRange(keys, range, keyMax);
         }
         else
         {
             // wrapped range
-            DecoratedKey emptyKey = new DecoratedKey(StorageService.getPartitioner().getMinimumToken(), null);
-            completed = getKeyRange(keys, startKey, emptyKey, keyMax, includeStartKey);
+            Range first = new Range(range.left, StorageService.getPartitioner().getMinimumToken());
+            completed = getKeyRange(keys, first, keyMax);
             if (!completed)
             {
-                completed = getKeyRange(keys, emptyKey, finishKey, keyMax, true);
+                Range second = new Range(StorageService.getPartitioner().getMinimumToken(), range.right);
+                completed = getKeyRange(keys, second, keyMax);
             }
         }
         List<Row> rows = new ArrayList<Row>(keys.size());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Sat Feb  6 04:15:39 2010
@@ -38,6 +38,8 @@
 
 import org.apache.cassandra.concurrent.StageManager;
 
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
@@ -66,26 +68,22 @@
 
     public final SlicePredicate predicate;
 
-    public final DecoratedKey startKey;
-    public final DecoratedKey finishKey;
+    public final AbstractBounds range;
     public final int max_keys;
-    public final boolean includeStartKey;
 
-    public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys)
+    public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds range, int max_keys)
     {
-        this(keyspace, column_parent.getColumn_family(), column_parent.getSuper_column(), predicate, startKey, finishKey, max_keys, true);
+        this(keyspace, column_parent.getColumn_family(), column_parent.getSuper_column(), predicate, range, max_keys);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys, boolean includeStartKey)
+    public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, AbstractBounds range, int max_keys)
     {
         this.keyspace = keyspace;
         this.column_family = column_family;
         this.super_column = super_column;
         this.predicate = predicate;
-        this.startKey = startKey;
-        this.finishKey = finishKey;
+        this.range = range;
         this.max_keys = max_keys;
-        this.includeStartKey = includeStartKey;
     }
 
     public Message getMessage() throws IOException
@@ -98,6 +96,19 @@
                            Arrays.copyOf(dob.getData(), dob.getLength()));
     }
 
+    @Override
+    public String toString()
+    {
+        return "RangeSliceCommand{" +
+               "keyspace='" + keyspace + '\'' +
+               ", column_family='" + column_family + '\'' +
+               ", super_column=" + super_column +
+               ", predicate=" + predicate +
+               ", range=" + range +
+               ", max_keys=" + max_keys +
+               '}';
+    }
+
     public static RangeSliceCommand read(Message message) throws IOException
     {
         byte[] bytes = message.getMessageBody();
@@ -118,10 +129,8 @@
 
         TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
         FBUtilities.serialize(ser, sliceCommand.predicate, dos);
-        DecoratedKey.serializer().serialize(sliceCommand.startKey, dos);
-        DecoratedKey.serializer().serialize(sliceCommand.finishKey, dos);
+        Bounds.serializer().serialize(sliceCommand.range, dos);
         dos.writeInt(sliceCommand.max_keys);
-        dos.writeBoolean(sliceCommand.includeStartKey);
     }
 
     public RangeSliceCommand deserialize(DataInputStream dis) throws IOException
@@ -138,11 +147,9 @@
         SlicePredicate pred = new SlicePredicate();
         FBUtilities.deserialize(dser, pred, dis);
 
-        DecoratedKey startKey = DecoratedKey.serializer().deserialize(dis);
-        DecoratedKey finishKey = DecoratedKey.serializer().deserialize(dis);
+        AbstractBounds range = AbstractBounds.serializer().deserialize(dis);
         int max_keys = dis.readInt();
-        boolean includeStartKey = dis.readBoolean();
-        return new RangeSliceCommand(keyspace, column_family, super_column, pred, startKey, finishKey, max_keys, includeStartKey);
+        return new RangeSliceCommand(keyspace, column_family, super_column, pred, range, max_keys);
     }
 
     static byte[] readBuf(int len, DataInputStream dis) throws IOException

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java?rev=907172&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java Sat Feb  6 04:15:39 2010
@@ -0,0 +1,51 @@
+package org.apache.cassandra.dht;
+
+import java.io.*;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer2;
+
+public abstract class AbstractBounds
+{
+    private static BoundsSerializer serializer_ = new BoundsSerializer();
+
+    private enum Type
+    {
+        RANGE,
+        BOUNDS
+    }
+
+    public static ICompactSerializer2<AbstractBounds> serializer()
+    {
+        return serializer_;
+    }
+
+    public final Token left;
+    public final Token right;
+
+    public AbstractBounds(Token left, Token right)
+    {
+        this.left = left;
+        this.right = right;
+    }
+
+    public abstract List<AbstractBounds> restrictTo(Range range);
+
+    private static class BoundsSerializer implements ICompactSerializer2<AbstractBounds>
+    {
+        public void serialize(AbstractBounds range, DataOutput out) throws IOException
+        {
+            out.writeInt(range instanceof Range ? Type.RANGE.ordinal() : Type.BOUNDS.ordinal());
+            Token.serializer().serialize(range.left, out);
+            Token.serializer().serialize(range.right, out);
+        }
+
+        public AbstractBounds deserialize(DataInput in) throws IOException
+        {
+            if (in.readInt() == Type.RANGE.ordinal())
+                return new Range(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+            return new Bounds(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+        }
+    }
+}
+

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java?rev=907172&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java Sat Feb  6 04:15:39 2010
@@ -0,0 +1,41 @@
+package org.apache.cassandra.dht;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.ObjectUtils;
+
+import org.apache.cassandra.service.StorageService;
+
+public class Bounds extends AbstractBounds
+{
+    public Bounds(Token left, Token right)
+    {
+        super(left, right);
+        // unlike a Range, a Bounds may not wrap
+        assert left.compareTo(right) <= 0 || right.equals(StorageService.getPartitioner().getMinimumToken());
+    }
+
+    public List<AbstractBounds> restrictTo(Range range)
+    {
+        Token left, right;
+        if (range.left.equals(range.right))
+        {
+            left = this.left;
+            right = this.right;
+        }
+        else
+        {
+            left = (Token) ObjectUtils.max(this.left, range.left);
+            right = this.right.equals(StorageService.getPartitioner().getMinimumToken())
+                    ? range.right
+                    : (Token) ObjectUtils.min(this.right, range.right);
+        }
+        return (List) Arrays.asList(new Bounds(left, right));
+    }
+
+    public String toString()
+    {
+        return "[" + left + "," + right + "]";
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Sat Feb  6 04:15:39 2010
@@ -18,9 +18,6 @@
 
 package org.apache.cassandra.dht;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,37 +26,19 @@
 
 import org.apache.commons.lang.ObjectUtils;
 
-import org.apache.cassandra.io.ICompactSerializer;
-
 
 /**
  * A representation of the range that a node is responsible for on the DHT ring.
  *
  * A Range is responsible for the tokens between (left, right].
  */
-public class Range implements Comparable<Range>, Serializable
+public class Range extends AbstractBounds implements Comparable<Range>, Serializable
 {
     public static final long serialVersionUID = 1L;
     
-    private static ICompactSerializer<Range> serializer_;
-
-    static
-    {
-        serializer_ = new RangeSerializer();
-    }
-    
-    public static ICompactSerializer<Range> serializer()
-    {
-        return serializer_;
-    }
-
-    public final Token left;
-    public final Token right;
-
     public Range(Token left, Token right)
     {
-        this.left = left;
-        this.right = right;
+        super(left, right);
     }
 
     public static boolean contains(Token left, Token right, Token bi)
@@ -146,6 +125,11 @@
         return intersectionOneWrapping(that, this);
     }
 
+    public List<AbstractBounds> restrictTo(Range range)
+    {
+        return (List) intersectionWith(range);
+    }
+
     private static List<Range> intersectionOneWrapping(Range wrapping, Range other)
     {
         List<Range> intersection = new ArrayList<Range>(2);
@@ -211,23 +195,14 @@
     {
         return toString().hashCode();
     }
-    
+
     public String toString()
     {
         return "(" + left + "," + right + "]";
     }
-}
-
-class RangeSerializer implements ICompactSerializer<Range>
-{
-    public void serialize(Range range, DataOutputStream dos) throws IOException
-    {
-        Token.serializer().serialize(range.left, dos);
-        Token.serializer().serialize(range.right, dos);
-    }
 
-    public Range deserialize(DataInputStream dis) throws IOException
+    public boolean isWrapAround()
     {
-        return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
+        return isWrapAround(left, right);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Sat Feb  6 04:15:39 2010
@@ -39,12 +39,10 @@
             RangeSliceCommand command = RangeSliceCommand.read(message);
             ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
             RangeSliceReply reply = cfs.getRangeSlice(command.super_column,
-                                                      command.startKey,
-                                                      command.finishKey,
+                                                      command.range,
                                                       command.max_keys,
                                                       command.predicate.slice_range,
-                                                      command.predicate.column_names,
-                                                      command.includeStartKey);
+                                                      command.predicate.column_names);
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Sat Feb  6 04:15:39 2010
@@ -27,12 +27,13 @@
 import java.util.concurrent.Future;
 import java.lang.management.ManagementFactory;
 
-import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import java.net.InetAddress;
+
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -43,9 +44,6 @@
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.concurrent.StageManager;
 
 import org.apache.log4j.Logger;
@@ -538,12 +536,15 @@
         long startTime = System.nanoTime();
         TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
 
-        InetAddress endPoint = StorageService.instance.getPrimary(command.startKey.token);
+        InetAddress endPoint = StorageService.instance.getPrimary(command.range.left);
         InetAddress startEndpoint = endPoint;
         final String table = command.keyspace;
         int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), DatabaseDescriptor.getReplicationFactor(table), consistency_level);
 
+        // starting with the node that is primary for the start key, scan until either we have enough results,
+        // or the node scan reports that it was done (i.e., encountered a key outside the desired range).
         Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
+        outer:
         do
         {
             Range primaryRange = StorageService.instance.getPrimaryRangeForEndPoint(endPoint);
@@ -551,45 +552,35 @@
             if (endpoints.size() < responseCount)
                 throw new UnavailableException();
 
-            // to make comparing the results from each node easy, we restrict each command to the data in the primary range for this iteration
-            DecoratedKey<?> startKey;
-            DecoratedKey<?> finishKey;
-            if (primaryRange.left.equals(primaryRange.right))
-            {
-                startKey = command.startKey;
-                finishKey = command.finishKey;
-            }
-            else
-            {
-                startKey = (DecoratedKey<?>) ObjectUtils.max(command.startKey, new DecoratedKey<Token<?>>(primaryRange.left, null));
-                finishKey = command.finishKey.isEmpty()
-                          ? new DecoratedKey<Token<?>>(primaryRange.right, null)
-                          : (DecoratedKey<?>) ObjectUtils.min(command.finishKey, new DecoratedKey<Token<?>>(primaryRange.right, null));
-            }
-            RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, startKey, finishKey, command.max_keys, command.includeStartKey);
-            Message message = c2.getMessage();
-
-            // collect replies and resolve according to consistency level
-            RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
-            QuorumResponseHandler<Map<String, ColumnFamily>> handler = new QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
-            if (logger.isDebugEnabled())
-                logger.debug("reading " + command + " for " + primaryRange + " from " + message.getMessageId() + "@" + endPoint);
-            for (InetAddress replicaEndpoint : endpoints)
-            {
-                MessagingService.instance.sendRR(message, replicaEndpoint, handler);
-            }
+            // to make comparing the results from each node easy, we restrict each scan the primary range for the node in question
+            List<AbstractBounds> restricted = command.range.restrictTo(primaryRange);
+            for (AbstractBounds range : restricted)
+            {
+                RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
+                Message message = c2.getMessage();
+
+                // collect replies and resolve according to consistency level
+                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
+                QuorumResponseHandler<Map<String, ColumnFamily>> handler = new QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
+                if (logger.isDebugEnabled())
+                    logger.debug("reading " + c2 + " for " + range + " from " + message.getMessageId() + "@" + endPoint);
+                for (InetAddress replicaEndpoint : endpoints)
+                {
+                    MessagingService.instance.sendRR(message, replicaEndpoint, handler);
+                }
 
-            // if we're done, great, otherwise, move to the next range
-            try
-            {
-                rows.putAll(handler.get());
-            }
-            catch (DigestMismatchException e)
-            {
-                throw new AssertionError(e); // no digests in range slices yet
+                // if we're done, great, otherwise, move to the next range
+                try
+                {
+                    rows.putAll(handler.get());
+                }
+                catch (DigestMismatchException e)
+                {
+                    throw new AssertionError(e); // no digests in range slices yet
+                }
+                if (rows.size() >= command.max_keys || resolver.completed())
+                    break outer;
             }
-            if (rows.size() >= command.max_keys || resolver.completed())
-                break;
 
             endPoint = tokenMetadata.getSuccessor(endPoint);
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java Sat Feb  6 04:15:39 2010
@@ -8,6 +8,7 @@
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.CompactEndPointSerializationHelper;
@@ -67,7 +68,7 @@
         dos.writeInt(srMetadata.ranges_.size());
         for (Range range : srMetadata.ranges_)
         {
-            Range.serializer().serialize(range, dos);
+            AbstractBounds.serializer().serialize(range, dos);
         }
     }
 
@@ -79,7 +80,7 @@
         List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
         for( int i = 0; i < size; ++i )
         {
-            ranges.add(Range.serializer().deserialize(dis));
+            ranges.add((Range) AbstractBounds.serializer().deserialize(dis));
         }
         return new StreamRequestMetadata(target, ranges, table);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Sat Feb  6 04:15:39 2010
@@ -37,6 +37,7 @@
 
 import static org.apache.cassandra.thrift.ThriftGlue.*;
 
+import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Cassandra.Iface;
@@ -555,9 +556,9 @@
         List<Pair<String, ColumnFamily>> rows;
         try
         {
-            DecoratedKey startKey = StorageService.getPartitioner().decorateKey(start_key);
-            DecoratedKey finishKey = StorageService.getPartitioner().decorateKey(finish_key);
-            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, startKey, finishKey, maxRows), consistency_level);
+            Bounds bounds = new Bounds(StorageService.getPartitioner().decorateKey(start_key).token,
+                                       StorageService.getPartitioner().decorateKey(finish_key).token);
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, maxRows), consistency_level);
             assert rows != null;
         }
         catch (TimeoutException e)

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java Sat Feb  6 04:15:39 2010
@@ -31,6 +31,9 @@
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.SliceRange;
 
@@ -63,14 +66,12 @@
 
     public static RangeSliceReply getRangeSlice(ColumnFamilyStore cfs) throws IOException, ExecutionException, InterruptedException
     {
-        DecoratedKey emptyKey = StorageService.getPartitioner().decorateKey("");
+        Token min = StorageService.getPartitioner().getMinimumToken();
         return cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY,
-                                 emptyKey,
-                                 emptyKey,
+                                 new Bounds(min, min),
                                  10000,
                                  new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, false, 10000),
-                                 null,
-                                 true);
+                                 null);
     }
 
     /**

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=907172&r1=907171&r2=907172&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Sat Feb  6 04:15:39 2010
@@ -137,12 +137,10 @@
 
         IPartitioner p = StorageService.getPartitioner();
         RangeSliceReply result = cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY,
-                                                   p.decorateKey("key2"),
-                                                   p.decorateKey("key1"),
+                                                   new Range(p.getToken("key15"), p.getToken("key1")),
                                                    10,
                                                    null,
-                                                   Arrays.asList("asdf".getBytes()),
-                                                   true);
+                                                   Arrays.asList("asdf".getBytes()));
         assertEquals(2, result.rows.size());
     }
 
@@ -153,12 +151,10 @@
 
         IPartitioner p = StorageService.getPartitioner();
         RangeSliceReply result = cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY,
-                                                   p.decorateKey("key1"),
-                                                   p.decorateKey("key2"),
+                                                   new Range(p.getToken("key1"), p.getToken("key2")),
                                                    10,
                                                    null,
-                                                   Arrays.asList("asdf".getBytes()),
-                                                   false);
+                                                   Arrays.asList("asdf".getBytes()));
         assertEquals(1, result.rows.size());
         assert result.rows.get(0).key.equals("key2");
     }
@@ -175,7 +171,6 @@
         rm = new RowMutation("Keyspace2", "key2");
         rm.add(new QueryPath("Standard1", null, "Column1".getBytes()), "asdf".getBytes(), 0);
         rms.add(rm);
-        ColumnFamilyStore cfs = Util.writeColumnFamily(rms);
-        return cfs;
+        return Util.writeColumnFamily(rms);
     }
 }