You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/23 18:05:47 UTC

svn commit: r915437 [1/2] - in /incubator/cassandra/branches/cassandra-0.5: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/s...

Author: jbellis
Date: Tue Feb 23 17:05:46 2010
New Revision: 915437

URL: http://svn.apache.org/viewvc?rev=915437&view=rev
Log:
backport fix for CASSANDRA-781 to 0.5.  patch by jbellis; tested by Hernan Badenes.

I took the approach of changing the StorageProxy code to match 0.6 and then pulling in pieces of 0.6 to make the build work as necessary, rather than trying to make the StorageProxy code match 0.5 better.  This (a) simplifies the backport by making it mostly a matter of copy and paste, reducing the chance of bugs in the backported code that didn't exist in the original, and (b) simplifies future bugfixes in this area, if they should be needed.

Added:
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/AbstractBounds.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Bounds.java
    incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BoundsTest.java
    incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java
Modified:
    incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/DecoratedKey.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceReply.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/RandomPartitioner.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Range.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/ThriftValidation.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/FBUtilities.java
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/MerkleTree.java
    incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/RangeTest.java
    incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java

Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/CHANGES.txt?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.5/CHANGES.txt Tue Feb 23 17:05:46 2010
@@ -8,7 +8,9 @@
  * allow larger numbers of keys (> 140M) in a sstable bloom filter
    (CASSANDRA-790)
  * include jvm argument improvements from CASSANDRA-504 in debian package
- * change streaming chunk size to 32MB (was 64MB) (CASSANDRA-795)
+ * change streaming chunk size to 32MB to accomodate Windows XP limitations
+   (was 64MB) (CASSANDRA-795)
+ * fix get_range_slice returning results in the wrong order (CASSANDRA-781)
  
 
 0.5.0 final

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Feb 23 17:05:46 2010
@@ -1532,7 +1532,7 @@
             rows.add(new Row(key, getColumnFamily(filter)));
         }
 
-        return new RangeSliceReply(rows, rr.rangeCompletedLocally);
+        return new RangeSliceReply(rows);
     }
 
     public AbstractType getComparator()

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/DecoratedKey.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/DecoratedKey.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/DecoratedKey.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/DecoratedKey.java Tue Feb 23 17:05:46 2010
@@ -23,8 +23,10 @@
 import java.io.DataInput;
 import java.util.Comparator;
 
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -39,6 +41,7 @@
 public class DecoratedKey<T extends Token> implements Comparable<DecoratedKey>
 {
     private static DecoratedKeySerializer serializer = new DecoratedKeySerializer();
+    private static IPartitioner partitioner = StorageService.getPartitioner();
 
     public static DecoratedKeySerializer serializer()
     {
@@ -67,11 +70,7 @@
     @Override
     public int hashCode()
     {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((key == null) ? 0 : key.hashCode());
-        result = prime * result + ((token == null) ? 0 : token.hashCode());
-        return result;
+        return token.hashCode();
     }
 
     @Override
@@ -95,7 +94,7 @@
 
     public boolean isEmpty()
     {
-        return key != null && key.isEmpty();
+        return token.equals(partitioner.getMinimumToken());
     }
 
     @Override

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceCommand.java Tue Feb 23 17:05:46 2010
@@ -36,8 +36,14 @@
 
 package org.apache.cassandra.db;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
@@ -46,19 +52,13 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
 public class RangeSliceCommand
 {
-    private static final SliceCommandSerializer serializer = new SliceCommandSerializer();
-    
+    private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
+
     public final String keyspace;
 
     public final String column_family;
@@ -66,29 +66,21 @@
 
     public final SlicePredicate predicate;
 
-    public final DecoratedKey startKey;
-    public final DecoratedKey finishKey;
+    public final AbstractBounds range;
     public final int max_keys;
 
-    public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys)
+    public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds range, int max_keys)
     {
-        this.keyspace = keyspace;
-        column_family = column_parent.getColumn_family();
-        super_column = column_parent.getSuper_column();
-        this.predicate = predicate;
-        this.startKey = startKey;
-        this.finishKey = finishKey;
-        this.max_keys = max_keys;
+        this(keyspace, column_parent.getColumn_family(), column_parent.getSuper_column(), predicate, range, max_keys);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys)
+    public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, AbstractBounds range, int max_keys)
     {
         this.keyspace = keyspace;
         this.column_family = column_family;
         this.super_column = super_column;
         this.predicate = predicate;
-        this.startKey = startKey;
-        this.finishKey = finishKey;
+        this.range = range;
         this.max_keys = max_keys;
     }
 
@@ -102,16 +94,28 @@
                            Arrays.copyOf(dob.getData(), dob.getLength()));
     }
 
+    @Override
+    public String toString()
+    {
+        return "RangeSliceCommand{" +
+               "keyspace='" + keyspace + '\'' +
+               ", column_family='" + column_family + '\'' +
+               ", super_column=" + super_column +
+               ", predicate=" + predicate +
+               ", range=" + range +
+               ", max_keys=" + max_keys +
+               '}';
+    }
+
     public static RangeSliceCommand read(Message message) throws IOException
     {
         byte[] bytes = message.getMessageBody();
-        DataInputBuffer dib = new DataInputBuffer();
-        dib.reset(bytes, bytes.length);
-        return serializer.deserialize(new DataInputStream(dib));
+        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        return serializer.deserialize(new DataInputStream(bis));
     }
 }
 
-class SliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
+class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
 {
     public void serialize(RangeSliceCommand sliceCommand, DataOutputStream dos) throws IOException
     {
@@ -122,19 +126,8 @@
             dos.write(sliceCommand.super_column);
 
         TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
-        try
-        {
-            byte[] serPred = ser.serialize(sliceCommand.predicate);
-            dos.writeInt(serPred.length);
-            dos.write(serPred);
-        }
-        catch (TException ex)
-        {
-            throw new IOException(ex);
-        }
-
-        DecoratedKey.serializer().serialize(sliceCommand.startKey, dos);
-        DecoratedKey.serializer().serialize(sliceCommand.finishKey, dos);
+        FBUtilities.serialize(ser, sliceCommand.predicate, dos);
+        AbstractBounds.serializer().serialize(sliceCommand.range, dos);
         dos.writeInt(sliceCommand.max_keys);
     }
 
@@ -148,29 +141,13 @@
         if (scLength > 0)
             super_column = readBuf(scLength, dis);
 
-        byte[] predBytes = new byte[dis.readInt()];
-        dis.readFully(predBytes);
         TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
-        SlicePredicate pred =  new SlicePredicate();
-        try
-        {
-            dser.deserialize(pred, predBytes);
-        }
-        catch (TException ex)
-        {
-            throw new IOException(ex);
-        }
+        SlicePredicate pred = new SlicePredicate();
+        FBUtilities.deserialize(dser, pred, dis);
 
-        DecoratedKey startKey = DecoratedKey.serializer().deserialize(dis);
-        DecoratedKey finishKey = DecoratedKey.serializer().deserialize(dis);
+        AbstractBounds range = AbstractBounds.serializer().deserialize(dis);
         int max_keys = dis.readInt();
-        return new RangeSliceCommand(keyspace,
-                                     new ColumnParent(column_family, super_column),
-                                     pred,
-                                     startKey,
-                                     finishKey,
-                                     max_keys);
-
+        return new RangeSliceCommand(keyspace, column_family, super_column, pred, range, max_keys);
     }
 
     static byte[] readBuf(int len, DataInputStream dis) throws IOException

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/RangeSliceReply.java Tue Feb 23 17:05:46 2010
@@ -18,35 +18,28 @@
 
 package org.apache.cassandra.db;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang.StringUtils;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.util.*;
 
 public class RangeSliceReply
 {
     public final List<Row> rows;
-    public final boolean rangeCompletedLocally;
 
-    public RangeSliceReply(List<Row> rows, boolean rangeCompletedLocally)
+    public RangeSliceReply(List<Row> rows)
     {
         this.rows = rows;
-        this.rangeCompletedLocally = rangeCompletedLocally;
     }
 
     public Message getReply(Message originalMessage) throws IOException
     {
         DataOutputBuffer dob = new DataOutputBuffer();
-        dob.writeBoolean(rangeCompletedLocally);
         dob.writeInt(rows.size());
         for (Row row : rows)
         {
@@ -61,21 +54,19 @@
     {
         return "RangeSliceReply{" +
                "rows=" + StringUtils.join(rows, ",") +
-               ", rangeCompletedLocally=" + rangeCompletedLocally +
                '}';
     }
 
     public static RangeSliceReply read(byte[] body) throws IOException
     {
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(body, body.length);
-        boolean completed = bufIn.readBoolean();
-        int rowCount = bufIn.readInt();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        DataInputStream dis = new DataInputStream(bufIn);
+        int rowCount = dis.readInt();
         List<Row> rows = new ArrayList<Row>(rowCount);
         for (int i = 0; i < rowCount; i++)
         {
-            rows.add(Row.serializer().deserialize(bufIn));
+            rows.add(Row.serializer().deserialize(dis));
         }
-        return new RangeSliceReply(rows, completed);
+        return new RangeSliceReply(rows);
     }
 }

Added: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/AbstractBounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/AbstractBounds.java?rev=915437&view=auto
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/AbstractBounds.java (added)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/AbstractBounds.java Tue Feb 23 17:05:46 2010
@@ -0,0 +1,71 @@
+package org.apache.cassandra.dht;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.ICompactSerializer2;
+
+public abstract class AbstractBounds implements Serializable
+{
+    private static AbstractBoundsSerializer serializer = new AbstractBoundsSerializer();
+
+    public static ICompactSerializer2<AbstractBounds> serializer()
+    {
+        return serializer;
+    }
+
+    private enum Type
+    {
+        RANGE,
+        BOUNDS
+    }
+
+    public final Token left;
+    public final Token right;
+
+    protected transient final IPartitioner partitioner;
+
+    public AbstractBounds(Token left, Token right, IPartitioner partitioner)
+    {
+        this.left = left;
+        this.right = right;
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return toString().hashCode();
+    }
+
+    @Override
+    public abstract boolean equals(Object obj);
+
+    public abstract boolean contains(Token start);
+
+    public abstract Set<AbstractBounds> restrictTo(Range range);
+
+    public abstract List<AbstractBounds> unwrap();
+
+    private static class AbstractBoundsSerializer implements ICompactSerializer2<AbstractBounds>
+    {
+        public void serialize(AbstractBounds range, DataOutput out) throws IOException
+        {
+            out.writeInt(range instanceof Range ? Type.RANGE.ordinal() : Type.BOUNDS.ordinal());
+            Token.serializer().serialize(range.left, out);
+            Token.serializer().serialize(range.right, out);
+        }
+
+        public AbstractBounds deserialize(DataInput in) throws IOException
+        {
+            if (in.readInt() == Type.RANGE.ordinal())
+                return new Range(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+            return new Bounds(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+        }
+    }
+}
+

Added: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Bounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Bounds.java?rev=915437&view=auto
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Bounds.java (added)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Bounds.java Tue Feb 23 17:05:46 2010
@@ -0,0 +1,73 @@
+package org.apache.cassandra.dht;
+
+import java.util.*;
+
+import org.apache.cassandra.service.StorageService;
+
+public class Bounds extends AbstractBounds
+{
+    public Bounds(Token left, Token right)
+    {
+        this(left, right, StorageService.getPartitioner());
+    }
+
+    Bounds(Token left, Token right, IPartitioner partitioner)
+    {
+        super(left, right, partitioner);
+        // unlike a Range, a Bounds may not wrap
+        assert left.compareTo(right) <= 0 || right.equals(partitioner.getMinimumToken()) : "[" + left + "," + right + "]";
+    }
+
+    @Override
+    public boolean contains(Token token)
+    {
+        return Range.contains(left, right, token) || left.equals(token);
+    }
+
+    public Set<AbstractBounds> restrictTo(Range range)
+    {
+        Token min = partitioner.getMinimumToken();
+
+        // special case Bounds where left=right (single Token)
+        if (this.left.equals(this.right) && !this.right.equals(min))
+            return range.contains(this.left)
+                   ? Collections.unmodifiableSet(new HashSet<AbstractBounds>(Arrays.asList(this)))
+                   : Collections.<AbstractBounds>emptySet();
+
+        // get the intersection of a Range w/ same left & right
+        Set<Range> ranges = range.intersectionWith(new Range(this.left, this.right));
+        // if range doesn't contain left token anyway, that's the correct answer
+        if (!range.contains(this.left))
+            return (Set) ranges;
+        // otherwise, add back in the left token
+        Set<AbstractBounds> S = new HashSet<AbstractBounds>(ranges.size());
+        for (Range restricted : ranges)
+        {
+            if (restricted.left.equals(this.left))
+                S.add(new Bounds(restricted.left, restricted.right));
+            else
+                S.add(restricted);
+        }
+        return Collections.unmodifiableSet(S);
+    }
+
+    public List<AbstractBounds> unwrap()
+    {
+        // Bounds objects never wrap
+        return (List)Arrays.asList(this);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof Bounds))
+            return false;
+        Bounds rhs = (Bounds)o;
+        return left.equals(rhs.left) && right.equals(rhs.right);
+    }
+
+    public String toString()
+    {
+        return "[" + left + "," + right + "]";
+    }
+}

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java Tue Feb 23 17:05:46 2010
@@ -25,6 +25,8 @@
 import java.util.Locale;
 import java.util.Random;
 
+import org.apache.commons.lang.ArrayUtils;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -33,7 +35,7 @@
 {
     static final Collator collator = Collator.getInstance(new Locale("en", "US"));
 
-    public static final BytesToken MINIMUM = new BytesToken(new byte[0]);
+    public static final BytesToken MINIMUM = new BytesToken(ArrayUtils.EMPTY_BYTE_ARRAY);
     
     public static final BigInteger BYTE_MASK = new BigInteger("255");
 
@@ -146,6 +148,8 @@
 
     public BytesToken getToken(String key)
     {
+        if (key.isEmpty())
+            return MINIMUM;
         return new BytesToken(collator.getCollationKey(key).toByteArray());
     }
 }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/RandomPartitioner.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/RandomPartitioner.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/RandomPartitioner.java Tue Feb 23 17:05:46 2010
@@ -113,6 +113,8 @@
 
     public BigIntegerToken getToken(String key)
     {
+        if (key.isEmpty())
+            return MINIMUM;
         return new BigIntegerToken(FBUtilities.hash(key));
     }
 }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Range.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/dht/Range.java Tue Feb 23 17:05:46 2010
@@ -18,13 +18,12 @@
 
 package org.apache.cassandra.dht;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
+import java.util.*;
 
-import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.commons.lang.ObjectUtils;
+
+import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -32,51 +31,23 @@
  *
  * A Range is responsible for the tokens between (left, right].
  */
-public class Range implements Comparable<Range>, Serializable
+public class Range extends AbstractBounds implements Comparable<Range>, Serializable
 {
     public static final long serialVersionUID = 1L;
     
-    private static ICompactSerializer<Range> serializer_;
-    static
-    {
-        serializer_ = new RangeSerializer();
-    }
-    
-    public static ICompactSerializer<Range> serializer()
-    {
-        return serializer_;
-    }
-
-    private final Token left_;
-    private final Token right_;
-
     public Range(Token left, Token right)
     {
-        left_ = left;
-        right_ = right;
+        this(left, right, StorageService.getPartitioner());
     }
 
-    /**
-     * Returns the left endpoint of a range.
-     * @return left endpoint
-     */
-    public Token left()
-    {
-        return left_;
-    }
-    
-    /**
-     * Returns the right endpoint of a range.
-     * @return right endpoint
-     */
-    public Token right()
+    public Range(Token left, Token right, IPartitioner partitioner)
     {
-        return right_;
+        super(left, right, partitioner);
     }
 
     public static boolean contains(Token left, Token right, Token bi)
     {
-        if ( isWrapAround(left, right) )
+        if (isWrapAround(left, right))
         {
             /* 
              * We are wrapping around, so the interval is (a,b] where a >= b,
@@ -85,7 +56,7 @@
              * (2) k <= b -- return true
              * (3) b < k <= a -- return false
              */
-            if ( bi.compareTo(left) > 0 )
+            if (bi.compareTo(left) > 0)
                 return true;
             else
                 return right.compareTo(bi) >= 0;
@@ -95,24 +66,36 @@
             /*
              * This is the range (a, b] where a < b. 
              */
-            return ( bi.compareTo(left) > 0 && right.compareTo(bi) >= 0 );
-        }        
+            return (bi.compareTo(left) > 0 && right.compareTo(bi) >= 0);
+        }
     }
 
     public boolean contains(Range that)
     {
-        boolean thiswraps = isWrapAround(this.left(), this.right());
-        boolean thatwraps = isWrapAround(that.left(), that.right());
+        if (this.left.equals(this.right))
+        {
+            // full ring always contains all other ranges
+            return true;
+        }
+
+        boolean thiswraps = isWrapAround(left, right);
+        boolean thatwraps = isWrapAround(that.left, that.right);
         if (thiswraps == thatwraps)
-            return this.left().compareTo(that.left()) <= 0 &&
-                that.right().compareTo(this.right()) <= 0;
+        {
+            return left.compareTo(that.left) <= 0 && that.right.compareTo(right) <= 0;
+        }
         else if (thiswraps)
+        {
             // wrapping might contain non-wrapping
-            return this.left().compareTo(that.left()) <= 0 ||
-                that.right().compareTo(this.right()) <= 0;
-        else // (thatwraps)
+            // that is contained if both its tokens are in one of our wrap segments
+            return left.compareTo(that.left) <= 0 || that.right.compareTo(right) <= 0;
+        }
+        else
+        {
+            // (thatwraps)
             // non-wrapping cannot contain wrapping
             return false;
+        }
     }
 
     /**
@@ -123,35 +106,106 @@
      */
     public boolean contains(Token bi)
     {
-        return contains(left_, right_, bi);
+        return contains(left, right, bi);
     }
 
     /**
-     * @param range range to check for intersection
+     * @param that range to check for intersection
      * @return true if the given range intersects with this range.
      */
     public boolean intersects(Range that)
     {
-        boolean thiswraps = isWrapAround(this.left(), this.right());
-        boolean thatwraps = isWrapAround(that.left(), that.right());
+        return intersectionWith(that).size() > 0;
+    }
+
+    public static Set<Range> rangeSet(Range ... ranges)
+    {
+        return Collections.unmodifiableSet(new HashSet<Range>(Arrays.asList(ranges)));
+    }
+
+    /**
+     * @param that
+     * @return the intersection of the two Ranges.  this can be two disjoint Ranges if one is wrapping and one is not.
+     * say you have nodes G and M, with query range (D,T]; the intersection is (M-T] and (D-G].
+     * If there is no intersection, an empty list is returned.
+     */
+    public Set<Range> intersectionWith(Range that)
+    {
+        if (this.contains(that))
+            return rangeSet(that);
+        if (that.contains(this))
+            return rangeSet(this);
+
+        boolean thiswraps = isWrapAround(left, right);
+        boolean thatwraps = isWrapAround(that.left, that.right);
+        if (!thiswraps && !thatwraps)
+        {
+            // neither wraps.  the straightforward case.
+            if (!(left.compareTo(that.right) < 0 && that.left.compareTo(right) < 0))
+                return Collections.emptySet();
+            return rangeSet(new Range((Token)ObjectUtils.max(this.left, that.left),
+                                      (Token)ObjectUtils.min(this.right, that.right)));
+        }
         if (thiswraps && thatwraps)
-            // both (must contain the minimum token)
-            return true;
-        else if (!thiswraps && !thatwraps)
-            // neither
-            return this.left().compareTo(that.right()) < 0 &&
-                that.left().compareTo(this.right()) < 0;
-        else
-            // either
-            return this.left().compareTo(that.right()) < 0 ||
-                that.left().compareTo(this.right()) < 0;
+        {
+            // if the starts are the same, one contains the other, which we have already ruled out.
+            assert !this.left.equals(that.left);
+            // two wrapping ranges always intersect.
+            // since we have already determined that neither this nor that contains the other, we have 2 cases,
+            // and mirror images of those case.
+            // (1) both of that's (1, 2] endpoints lie in this's (A, B] right segment:
+            //  ---------B--------A--1----2------>
+            // (2) only that's start endpoint lies in this's right segment:
+            //  ---------B----1---A-------2------>
+            // or, we have the same cases on the left segement, which we can handle by swapping this and that.
+            return this.left.compareTo(that.left) < 0
+                   ? intersectionBothWrapping(this, that)
+                   : intersectionBothWrapping(that, this);
+        }
+        if (thiswraps && !thatwraps)
+            return intersectionOneWrapping(this, that);
+        assert (!thiswraps && thatwraps);
+        return intersectionOneWrapping(that, this);
+    }
+
+    private static Set<Range> intersectionBothWrapping(Range first, Range that)
+    {
+        Set<Range> intersection = new HashSet<Range>(2);
+        if (that.right.compareTo(first.left) > 0)
+            intersection.add(new Range(first.left, that.right));
+        intersection.add(new Range(that.left, first.right));
+        return Collections.unmodifiableSet(intersection);
+    }
+
+    private static Set<Range> intersectionOneWrapping(Range wrapping, Range other)
+    {
+        Set<Range> intersection = new HashSet<Range>(2);
+        if (other.contains(wrapping.right))
+            intersection.add(new Range(other.left, wrapping.right));
+        // need the extra compareto here because ranges are asymmetrical; wrapping.left _is not_ contained by the wrapping range
+        if (other.contains(wrapping.left) && wrapping.left.compareTo(other.right) < 0)
+            intersection.add(new Range(wrapping.left, other.right));
+        return Collections.unmodifiableSet(intersection);
+    }
+
+    public Set<AbstractBounds> restrictTo(Range range)
+    {
+        return (Set) intersectionWith(range);
+    }
+
+    public List<AbstractBounds> unwrap()
+    {
+        if (!isWrapAround() || right.equals(partitioner.getMinimumToken()))
+            return (List)Arrays.asList(this);
+        List<AbstractBounds> unwrapped = new ArrayList<AbstractBounds>(2);
+        unwrapped.add(new Range(left, partitioner.getMinimumToken()));
+        unwrapped.add(new Range(partitioner.getMinimumToken(), right));
+        return unwrapped;
     }
 
     /**
      * Tells if the given range is a wrap around.
-     * @param range
-     * @return
-     */
+         */
     public static boolean isWrapAround(Token left, Token right)
     {
         return left.compareTo(right) >= 0;
@@ -163,13 +217,13 @@
          * If the range represented by the "this" pointer
          * is a wrap around then it is the smaller one.
          */
-        if ( isWrapAround(left(), right()) )
+        if ( isWrapAround(left, right) )
             return -1;
-        
-        if ( isWrapAround(rhs.left(), rhs.right()) )
+
+        if ( isWrapAround(rhs.left, rhs.right) )
             return 1;
         
-        return right_.compareTo(rhs.right_);
+        return right.compareTo(rhs.right);
     }
     
 
@@ -189,34 +243,19 @@
 
     public boolean equals(Object o)
     {
-        if ( !(o instanceof Range) )
+        if (!(o instanceof Range))
             return false;
         Range rhs = (Range)o;
-        return left_.equals(rhs.left_) && right_.equals(rhs.right_);
-    }
-    
-    @Override
-    public int hashCode()
-    {
-        return toString().hashCode();
+        return left.equals(rhs.left) && right.equals(rhs.right);
     }
     
     public String toString()
     {
-        return "(" + left_ + "," + right_ + "]";
-    }
-}
-
-class RangeSerializer implements ICompactSerializer<Range>
-{
-    public void serialize(Range range, DataOutputStream dos) throws IOException
-    {
-        Token.serializer().serialize(range.left(), dos);
-        Token.serializer().serialize(range.right(), dos);
+        return "(" + left + "," + right + "]";
     }
 
-    public Range deserialize(DataInputStream dis) throws IOException
+    public boolean isWrapAround()
     {
-        return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
+        return isWrapAround(left, right);
     }
 }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/StreamRequestMetadata.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/StreamRequestMetadata.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/io/StreamRequestMetadata.java Tue Feb 23 17:05:46 2010
@@ -90,7 +90,7 @@
         List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
         for( int i = 0; i < size; ++i )
         {
-            ranges.add(Range.serializer().deserialize(dis));
+            ranges.add((Range) Range.serializer().deserialize(dis));
         }            
         return new StreamRequestMetadata( target, ranges );
     }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue Feb 23 17:05:46 2010
@@ -22,6 +22,7 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.collect.*;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.Range;
 
@@ -29,11 +30,6 @@
 
 import org.apache.commons.lang.StringUtils;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
-
 public class TokenMetadata
 {
     /* Maintains token to endpoint map of every node in the cluster. */
@@ -472,4 +468,41 @@
         return sb.toString();
     }
 
+    /**
+     * iterator over the Tokens in the given ring, starting with the token for the node owning start
+     * (which does not have to be a Token in the ring)
+     */
+    public static Iterator<Token> ringIterator(final List ring, Token start)
+    {
+        assert ring.size() > 0;
+        int i = Collections.binarySearch(ring, start);
+        if (i < 0)
+        {
+            i = (i + 1) * (-1);
+            if (i >= ring.size())
+            {
+                i = 0;
+            }
+        }
+        final int startIndex = i;
+        return new AbstractIterator<Token>()
+        {
+            int j = startIndex;
+            protected Token computeNext()
+            {
+                if (j < 0)
+                    return endOfData();
+                try
+                {
+                    return (Token) ring.get(j);
+                }
+                finally
+                {
+                    j = (j + 1) % ring.size();
+                    if (j == startIndex)
+                        j = -1;
+                }
+            }
+        };
+    }
 }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/CassandraServer.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/CassandraServer.java Tue Feb 23 17:05:46 2010
@@ -22,6 +22,7 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.log4j.Logger;
 
@@ -32,6 +33,7 @@
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.Pair;
@@ -465,29 +467,28 @@
         return columnFamiliesMap;
     }
 
-    public List<KeySlice> get_range_slice(String keyspace, ColumnParent column_parent, SlicePredicate predicate, String start_key, String finish_key, int maxRows, int consistency_level)
-            throws InvalidRequestException, UnavailableException, TException, TimedOutException
+    public List<KeySlice> get_range_slice(String keyspace, ColumnParent column_parent, SlicePredicate predicate, String start_key, String end_key, int maxRows, int consistency_level)
+    throws InvalidRequestException, UnavailableException, TException, TimedOutException
     {
         if (logger.isDebugEnabled())
-            logger.debug("range_slice");
+            logger.debug("get_range_slice " + start_key + " to " + end_key);
+
+        ThriftValidation.validateColumnParent(keyspace, column_parent);
         ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
-        if (!StorageService.getPartitioner().preservesOrder())
-        {
-            throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");
-        }
-        if (maxRows <= 0)
-        {
-            throw new InvalidRequestException("maxRows must be positive");
-        }
+        ThriftValidation.validateKeyRange(start_key, end_key, maxRows);
 
-        List<Pair<String, ColumnFamily>> rows;
+        List<Row> rows;
         try
         {
-            DecoratedKey startKey = StorageService.getPartitioner().decorateKey(start_key);
-            DecoratedKey finishKey = StorageService.getPartitioner().decorateKey(finish_key);
-            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, startKey, finishKey, maxRows), consistency_level);
+            IPartitioner p = StorageService.getPartitioner();
+            Bounds bounds = new Bounds(p.decorateKey(start_key).token, p.decorateKey(end_key).token);
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, maxRows), consistency_level);
             assert rows != null;
         }
+        catch (TimeoutException e)
+        {
+        	throw new TimedOutException();
+        }
         catch (IOException e)
         {
             throw new RuntimeException(e);
@@ -495,10 +496,10 @@
 
         List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
         boolean reversed = predicate.slice_range != null && predicate.slice_range.reversed;
-        for (Pair<String, ColumnFamily> row : rows)
+        for (Row row : rows)
         {
-            List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(row.right, column_parent.super_column != null, reversed);
-            keySlices.add(new KeySlice(row.left, thriftifiedColumns));
+            List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(row.cf, column_parent.super_column != null, reversed);
+            keySlices.add(new KeySlice(row.key, thriftifiedColumns));
         }
 
         return keySlices;

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Tue Feb 23 17:05:46 2010
@@ -24,63 +24,78 @@
 
 import org.apache.log4j.Logger;
 
+import org.apache.commons.collections.iterators.CollatingIterator;
+
+import com.google.common.collect.AbstractIterator;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RangeSliceReply;
 import org.apache.cassandra.db.Row;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.ReducingIterator;
 
 /**
  * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
  * to the most recent ColumnFamily and setting up read repairs as necessary.
  */
-public class RangeSliceResponseResolver implements IResponseResolver<Map<String, ColumnFamily>>
+public class RangeSliceResponseResolver implements IResponseResolver<List<Row>>
 {
     private static final Logger logger_ = Logger.getLogger(RangeSliceResponseResolver.class);
     private final String table;
-    private final Range range;
     private final List<InetAddress> sources;
-    private boolean isCompleted;
 
-    public RangeSliceResponseResolver(String table, Range range, List<InetAddress> sources)
+    public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
         assert sources.size() > 0;
         this.sources = sources;
-        this.range = range;
         this.table = table;
     }
 
-    public Map<String, ColumnFamily> resolve(List<Message> responses) throws DigestMismatchException, IOException
+    public List<Row> resolve(List<Message> responses) throws DigestMismatchException, IOException
     {
-        Map<InetAddress, Map<String, ColumnFamily>> replies = new HashMap<InetAddress, Map<String, ColumnFamily>>(responses.size());
-        Set<String> allKeys = new HashSet<String>();
-        for (Message response : responses)
+        CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
         {
-            RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
-            isCompleted &= reply.rangeCompletedLocally;
-            Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(reply.rows.size());
-            for (Row row : reply.rows)
+            public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress> o2)
             {
-                rows.put(row.key, row.cf);
-                allKeys.add(row.key);
+                return o1.left.key.compareTo(o2.left.key);
             }
-            replies.put(response.getFrom(), rows);
+        });
+        
+        int n = 0;
+        for (Message response : responses)
+        {
+            RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+            n = Math.max(n, reply.rows.size());
+            collator.addIterator(new RowIterator(reply.rows.iterator(), response.getFrom()));
         }
 
         // for each row, compute the combination of all different versions seen, and repair incomplete versions
-        // TODO since the rows all arrive in sorted order, we should be able to do this more efficiently w/o all the Map conversion
-        Map<String, ColumnFamily> resolvedRows = new HashMap<String, ColumnFamily>(allKeys.size());
-        for (String key : allKeys)
+        ReducingIterator<Pair<Row,InetAddress>, Row> iter = new ReducingIterator<Pair<Row,InetAddress>, Row>(collator)
         {
             List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
-            for (InetAddress endpoint : sources)
+            List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size());
+            String key;
+
+            public void reduce(Pair<Row,InetAddress> current)
             {
-                versions.add(replies.get(endpoint).get(key));
+                key = current.left.key;
+                versions.add(current.left.cf);
+                versionSources.add(current.right);
             }
-            ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
-            ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, sources);
-            resolvedRows.put(key, resolved);
-        }
+
+            protected Row getReduced()
+            {
+                ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
+                ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
+                versions.clear();
+                return new Row(key, resolved);
+            }
+        };
+
+        List<Row> resolvedRows = new ArrayList<Row>(n);
+        while (iter.hasNext())
+            resolvedRows.add(iter.next());
+
         return resolvedRows;
     }
 
@@ -89,11 +104,21 @@
         return responses.size() >= sources.size();
     }
 
-    /**
-     * only valid after resolve has been called (typically via QRH.get)
-     */
-    public boolean completed()
+    private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>>
     {
-        return isCompleted;
+        private final Iterator<Row> iter;
+        private final InetAddress source;
+
+        private RowIterator(Iterator<Row> iter, InetAddress source)
+        {
+            this.iter = iter;
+            this.source = source;
+        }
+
+        @Override
+        protected Pair<Row,InetAddress> computeNext()
+        {
+            return iter.hasNext() ? new Pair<Row, InetAddress>(iter.next(), source) : endOfData();
+        }
     }
 }
\ No newline at end of file

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Tue Feb 23 17:05:46 2010
@@ -18,9 +18,7 @@
 
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.db.RangeSliceCommand;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -36,13 +34,13 @@
         try
         {
             RangeSliceCommand command = RangeSliceCommand.read(message);
-            RangeSliceReply reply = Table.open(command.keyspace).getColumnFamilyStore(command.column_family).getRangeSlice(
-                    command.super_column,
-                    command.startKey,
-                    command.finishKey,
-                    command.max_keys,
-                    command.predicate.slice_range,
-                    command.predicate.column_names);
+            ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
+            RangeSliceReply reply = cfs.getRangeSlice(command.super_column,
+                                                      new DecoratedKey(command.range.left, null),
+                                                      new DecoratedKey(command.range.right, null),
+                                                      command.max_keys,
+                                                      command.predicate.slice_range,
+                                                      command.predicate.column_names);
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageProxy.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb 23 17:05:46 2010
@@ -28,8 +28,10 @@
 
 import org.apache.commons.lang.StringUtils;
 
+import com.google.common.collect.AbstractIterator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.DataInputBuffer;
 import java.net.InetAddress;
 import org.apache.cassandra.net.IAsyncResult;
@@ -39,8 +41,6 @@
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.concurrent.StageManager;
 
@@ -512,86 +512,153 @@
         return rows;
     }
 
-    static List<Pair<String, ColumnFamily>> getRangeSlice(RangeSliceCommand command, int consistency_level) throws IOException, UnavailableException, TimedOutException
+    public static List<Row> getRangeSlice(RangeSliceCommand command, int consistency_level)
+    throws IOException, UnavailableException, TimeoutException
     {
+        if (logger.isDebugEnabled())
+            logger.debug(command);
         long startTime = System.currentTimeMillis();
-        TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
 
-        InetAddress endPoint = StorageService.instance().getPrimary(command.startKey.token);
-        InetAddress startEndpoint = endPoint;
         int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
 
-        Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
-        do
-        {
-            Range primaryRange = StorageService.instance().getPrimaryRangeForEndPoint(endPoint);
-            List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(primaryRange.right());
-            if (endpoints.size() < responseCount)
-                throw new UnavailableException();
+        List<Pair<AbstractBounds, List<InetAddress>>> ranges = getRestrictedRanges(command.range, command.keyspace, responseCount);
 
-            // to make comparing the results from each node easy, we restrict each command to the data in the primary range for this iteration
-            DecoratedKey startKey;
-            DecoratedKey finishKey;
-            if (primaryRange.left().equals(primaryRange.right()))
-            {
-                startKey = command.startKey;
-                finishKey = command.finishKey;
-            }
-            else
-            {
-                startKey = Collections.max(Arrays.asList(command.startKey, new DecoratedKey(primaryRange.left(), null)));
-                finishKey = command.finishKey.isEmpty()
-                          ? new DecoratedKey(primaryRange.right(), null)
-                          : Collections.min(Arrays.asList(command.finishKey, new DecoratedKey(primaryRange.right(), null)));
-            }
-            RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, startKey, finishKey, command.max_keys);
+        // now scan until we have enough results
+        List<Row> rows = new ArrayList<Row>(command.max_keys);
+        for (Pair<AbstractBounds, List<InetAddress>> pair : getRangeIterator(ranges, command.range.left))
+        {
+            AbstractBounds range = pair.left;
+            List<InetAddress> endpoints = pair.right;
+            RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
             Message message = c2.getMessage();
 
             // collect replies and resolve according to consistency level
-            RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
-            QuorumResponseHandler<Map<String, ColumnFamily>> handler = new QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
-            if (logger.isDebugEnabled())
-                logger.debug("reading " + command + " for " + primaryRange + " from " + message.getMessageId() + "@" + endPoint);
-            for (InetAddress replicaEndpoint : endpoints)
+            RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, endpoints);
+            QuorumResponseHandler<List<Row>> handler = new QuorumResponseHandler<List<Row>>(responseCount, resolver);
+
+            for (InetAddress endpoint : endpoints)
             {
-                MessagingService.instance().sendRR(message, replicaEndpoint, handler);
+                MessagingService.instance().sendRR(message, endpoint, handler);
+                if (logger.isDebugEnabled())
+                    logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
             }
+            // TODO read repair on remaining replicas?
 
             // if we're done, great, otherwise, move to the next range
             try
             {
-                rows.putAll(handler.get());
-            }
-            catch (TimeoutException e)
-            {
-                throw new TimedOutException();
+                if (logger.isDebugEnabled())
+                {
+                    for (Row row : handler.get())
+                    {
+                        logger.debug("range slices read " + row.key);
+                    }
+                }
+                rows.addAll(handler.get());
             }
             catch (DigestMismatchException e)
             {
                 throw new AssertionError(e); // no digests in range slices yet
             }
-            if (rows.size() >= command.max_keys || resolver.completed())
+            if (rows.size() >= command.max_keys)
                 break;
-
-            endPoint = tokenMetadata.getSuccessor(endPoint);
         }
-        while (!endPoint.equals(startEndpoint));
 
-        List<Pair<String, ColumnFamily>> results = new ArrayList<Pair<String, ColumnFamily>>(rows.size());
-        for (Map.Entry<String, ColumnFamily> entry : rows.entrySet())
+        rangeStats.add(System.currentTimeMillis() - startTime);
+        return rows.size() > command.max_keys ? rows.subList(0, command.max_keys) : rows;
+    }
+
+    /**
+     * returns an iterator that will return ranges in ring order, starting with the one that contains the start token
+     */
+    private static Iterable<Pair<AbstractBounds, List<InetAddress>>> getRangeIterator(final List<Pair<AbstractBounds, List<InetAddress>>> ranges, Token start)
+    {
+        // sort ranges in ring order
+        Comparator<Pair<AbstractBounds, List<InetAddress>>> comparator = new Comparator<Pair<AbstractBounds, List<InetAddress>>>()
         {
-            ColumnFamily cf = entry.getValue();
-            results.add(new Pair<String, ColumnFamily>(entry.getKey(), cf));
+            public int compare(Pair<AbstractBounds, List<InetAddress>> o1, Pair<AbstractBounds, List<InetAddress>> o2)
+            {
+                // no restricted ranges will overlap so we don't need to worry about inclusive vs exclusive left,
+                // just sort by raw token position.
+                return o1.left.left.compareTo(o2.left.left);
+            }
+        };
+        Collections.sort(ranges, comparator);
+
+        // find the one to start with
+        int i;
+        for (i = 0; i < ranges.size(); i++)
+        {
+            AbstractBounds range = ranges.get(i).left;
+            if (range.contains(start) || range.left.equals(start))
+                break;
         }
-        Collections.sort(results, new Comparator<Pair<String, ColumnFamily>>()
+        AbstractBounds range = ranges.get(i).left;
+        assert range.contains(start) || range.left.equals(start); // make sure the loop didn't just end b/c ranges were exhausted
+
+        // return an iterable that starts w/ the correct range and iterates the rest in ring order
+        final int begin = i;
+        return new Iterable<Pair<AbstractBounds, List<InetAddress>>>()
         {
-            public int compare(Pair<String, ColumnFamily> o1, Pair<String, ColumnFamily> o2)
+            public Iterator<Pair<AbstractBounds, List<InetAddress>>> iterator()
             {
-                return keyComparator.compare(o1.left, o2.left);                
+                return new AbstractIterator<Pair<AbstractBounds, List<InetAddress>>>()
+                {
+                    int n = 0;
+
+                    protected Pair<AbstractBounds, List<InetAddress>> computeNext()
+                    {
+                        if (n == ranges.size())
+                            return endOfData();
+                        return ranges.get((begin + n++) % ranges.size());
+                    }
+                };
             }
-        });
-        rangeStats.add(System.currentTimeMillis() - startTime);
-        return results;
+        };
+    }
+
+    /**
+     * compute all ranges we're going to query, in sorted order, so that we get the correct results back.
+     *  1) computing range intersections is necessary because nodes can be replica destinations for many ranges,
+     *     so if we do not restrict each scan to the specific range we want we will get duplicate results.
+     *  2) sorting the intersection ranges is necessary because wraparound node ranges can be discontiguous.
+     *     Consider a 2-node ring, (D, T] and (T, D]. A query for [A, Z] will intersect the 2nd node twice,
+     *     at [A, D] and (T, Z]. We need to scan the (D, T] range in between those, or we will skip those
+     *     results entirely if the limit is low enough.
+     *  3) we unwrap the intersection ranges because otherwise we get results in the wrong order.
+     *     Consider a 2-node ring, (D, T] and (T, D].  A query for [D, Z] will get results in the wrong
+     *     order if we use (T, D] directly -- we need to start with that range, because our query starts with
+     *     D, but we don't want any other results from it until after the (D, T] range.  Unwrapping so that
+     *     the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
+     */
+    private static List<Pair<AbstractBounds, List<InetAddress>>> getRestrictedRanges(AbstractBounds queryRange, String keyspace, int responseCount)
+    throws UnavailableException
+    {
+        TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
+        Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left);
+        List<Pair<AbstractBounds, List<InetAddress>>> ranges = new ArrayList<Pair<AbstractBounds, List<InetAddress>>>();
+        while (iter.hasNext())
+        {
+            Token nodeToken = iter.next();
+            Range nodeRange = new Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
+            List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(nodeToken);
+            if (endpoints.size() < responseCount)
+                throw new UnavailableException();
+
+            DatabaseDescriptor.getEndPointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+            List<InetAddress> endpointsForCL = endpoints.subList(0, responseCount);
+            Set<AbstractBounds> restrictedRanges = queryRange.restrictTo(nodeRange);
+            for (AbstractBounds range : restrictedRanges)
+            {
+                for (AbstractBounds unwrapped : range.unwrap())
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Adding to restricted ranges " + unwrapped + " for " + nodeRange);
+                    ranges.add(new Pair<AbstractBounds, List<InetAddress>>(unwrapped, endpointsForCL));
+                }
+            }
+        }
+        return ranges;
     }
 
     static List<String> getKeyRange(RangeCommand command) throws IOException, UnavailableException, TimedOutException

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java Tue Feb 23 17:05:46 2010
@@ -399,7 +399,7 @@
         Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range, List<InetAddress>>();
         for (Range range : ranges)
         {
-            rangeToEndPointMap.put(range, replicationStrategy_.getNaturalEndpoints(range.right()));
+            rangeToEndPointMap.put(range, replicationStrategy_.getNaturalEndpoints(range.right));
         }
         return rangeToEndPointMap;
     }
@@ -628,8 +628,8 @@
         // all leaving nodes are gone.
         for (Range range : affectedRanges)
         {
-            List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right(), tm);
-            List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right(), allLeftMetadata);
+            List<InetAddress> currentEndPoints = strategy.getNaturalEndpoints(range.right, tm);
+            List<InetAddress> newEndPoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata);
             newEndPoints.removeAll(currentEndPoints);
             pendingRanges.putAll(range, newEndPoints);
         }
@@ -734,7 +734,7 @@
 
         // Find (for each range) all nodes that store replicas for these ranges as well
         for (Range range : ranges)
-            currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right(), tokenMetadata_));
+            currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right, tokenMetadata_));
 
         TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
 
@@ -752,7 +752,7 @@
         // range.
         for (Range range : ranges)
         {
-            ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right(), temp);
+            ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right, temp);
             newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
             if (logger_.isDebugEnabled())
                 if (newReplicaEndpoints.isEmpty())
@@ -1232,7 +1232,7 @@
         // (we're only operating on 1/128 of the keys remember)
         Range range = getLocalPrimaryRange();
         List<String> tokens = new ArrayList<String>();
-        tokens.add(range.left().toString());
+        tokens.add(range.left.toString());
 
         List<DecoratedKey> decoratedKeys = SSTableReader.getIndexedDecoratedKeys();
         if (decoratedKeys.size() < splits)
@@ -1253,7 +1253,7 @@
             }
         }
 
-        tokens.add(range.right().toString());
+        tokens.add(range.right.toString());
         return tokens;
     }
 

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/ThriftValidation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/ThriftValidation.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/ThriftValidation.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/ThriftValidation.java Tue Feb 23 17:05:46 2010
@@ -32,6 +32,9 @@
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
 
 public class ThriftValidation
 {
@@ -219,4 +222,23 @@
         else
             validateColumns(keyspace, column_parent, predicate.column_names);
     }
+
+    public static void validateKeyRange(String start_key, String end_key, int row_count) throws InvalidRequestException
+    {
+        IPartitioner p = StorageService.getPartitioner();
+        Token startToken = p.decorateKey(start_key).token;
+        Token endToken = p.decorateKey(end_key).token;
+        if (startToken.compareTo(endToken) > 0 && !endToken.equals(p.getMinimumToken()))
+        {
+            if (p instanceof RandomPartitioner)
+                throw new InvalidRequestException("start key's md5 sorts after end key's md5.  this is not allowed; you probably should not specify end key at all, under RandomPartitioner");
+            else
+                throw new InvalidRequestException("start key must sort before (or equal to) finish key in your partitioner!");
+        }
+
+        if (row_count <= 0)
+        {
+            throw new InvalidRequestException("row_count must be positive");
+        }
+    }
 }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Feb 23 17:05:46 2010
@@ -215,7 +215,7 @@
         // emphasize that we're showing the right part of each range
         if (ranges.size() > 1)
         {
-            outs.println(String.format("%-14s%-11s%-14s%-43s", "", "", "", ranges.get(0).left()));
+            outs.println(String.format("%-14s%-11s%-14s%-43s", "", "", "", ranges.get(0).left));
         }
         // normal range & node info
         for (Range range : ranges) {
@@ -234,7 +234,7 @@
             String load = loadMap.containsKey(primaryEndpoint) ? loadMap.get(primaryEndpoint) : "?";
             outs.print(String.format("%-14s", load));
 
-            outs.print(String.format("%-43s", range.right()));
+            outs.print(String.format("%-43s", range.right));
 
             String asciiRingArt;
             if (counter == 0)

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/FBUtilities.java Tue Feb 23 17:05:46 2010
@@ -33,6 +33,10 @@
 import org.apache.commons.collections.iterators.CollatingIterator;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
 
 public class FBUtilities
 {
@@ -318,4 +322,41 @@
             }
         });
     }
+
+    public static void serialize(TSerializer serializer, TBase struct, DataOutput out)
+    throws IOException
+    {
+        assert serializer != null;
+        assert struct != null;
+        assert out != null;
+        byte[] bytes;
+        try
+        {
+            bytes = serializer.serialize(struct);
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        out.writeInt(bytes.length);
+        out.write(bytes);
+    }
+
+    public static void deserialize(TDeserializer deserializer, TBase struct, DataInput in)
+    throws IOException
+    {
+        assert deserializer != null;
+        assert struct != null;
+        assert in != null;
+        byte[] bytes = new byte[in.readInt()];
+        in.readFully(bytes);
+        try
+        {
+            deserializer.deserialize(struct, bytes);
+        }
+        catch (TException ex)
+        {
+            throw new IOException(ex);
+        }
+    }
 }

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/MerkleTree.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/MerkleTree.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/MerkleTree.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/utils/MerkleTree.java Tue Feb 23 17:05:46 2010
@@ -186,9 +186,9 @@
      */
     static int differenceHelper(MerkleTree ltree, MerkleTree rtree, List<TreeRange> diff, TreeRange active)
     {
-        Token midpoint = ltree.partitioner().midpoint(active.left(), active.right());
-        TreeRange left = new TreeRange(null, active.left(), midpoint, inc(active.depth), null);
-        TreeRange right = new TreeRange(null, midpoint, active.right(), inc(active.depth), null);
+        Token midpoint = ltree.partitioner().midpoint(active.left, active.right);
+        TreeRange left = new TreeRange(null, active.left, midpoint, inc(active.depth), null);
+        TreeRange right = new TreeRange(null, midpoint, active.right, inc(active.depth), null);
         byte[] lhash;
         byte[] rhash;
         
@@ -320,8 +320,8 @@
         // else: node.
         
         Inner node = (Inner)hashable;
-        Range leftactive = new Range(active.left(), node.token);
-        Range rightactive = new Range(node.token, active.right());
+        Range leftactive = new Range(active.left, node.token);
+        Range rightactive = new Range(node.token, active.right);
 
         if (range.contains(active))
         {
@@ -558,8 +558,8 @@
 
                 Inner node = (Inner)active.hashable;
                 // push intersecting children onto the stack
-                TreeRange left = new TreeRange(tree, active.left(), node.token, inc(active.depth), node.lchild);
-                TreeRange right = new TreeRange(tree, node.token, active.right(), inc(active.depth), node.rchild);
+                TreeRange left = new TreeRange(tree, active.left, node.token, inc(active.depth), node.lchild);
+                TreeRange right = new TreeRange(tree, node.token, active.right, inc(active.depth), node.rchild);
                 if (right.intersects(range))
                     tovisit.push(right);
                 if (left.intersects(range))

Modified: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=915437&r1=915436&r2=915437&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Tue Feb 23 17:05:46 2010
@@ -60,7 +60,7 @@
 
         InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
         Range range3 = ss.getPrimaryRangeForEndPoint(three);
-        Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(), range3.right());
+        Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left, range3.right);
         assert range3.contains(fakeToken);
         ss.onChange(myEndpoint, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + ss.getPartitioner().getTokenFactory().toString(fakeToken)));
         tmd = ss.getTokenMetadata();

Added: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BoundsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BoundsTest.java?rev=915437&view=auto
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BoundsTest.java (added)
+++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BoundsTest.java Tue Feb 23 17:05:46 2010
@@ -0,0 +1,73 @@
+package org.apache.cassandra.dht;
+
+import java.util.*;
+
+import junit.framework.TestCase;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class BoundsTest extends TestCase
+{
+    public void testRestrictTo() throws Exception
+    {
+        IPartitioner p = new OrderPreservingPartitioner();
+        Token min = p.getMinimumToken();
+        Range wraps = new Range(new StringToken("m"), new StringToken("e"));
+        Range normal = new Range(wraps.right, wraps.left);
+        Bounds all = new Bounds(min, min, p);
+        Bounds almostAll = new Bounds(new StringToken("a"), min, p);
+
+        Set<AbstractBounds> S;
+        Set<AbstractBounds> S2;
+
+        S = all.restrictTo(wraps);
+        assert S.equals(new HashSet<AbstractBounds>(Arrays.asList(wraps)));
+
+        S = almostAll.restrictTo(wraps);
+        S2 = new HashSet<AbstractBounds>(Arrays.asList(new Bounds(new StringToken("a"), new StringToken("e"), p),
+                                                       new Range(new StringToken("m"), min)));
+        assert S.equals(S2);
+
+        S = all.restrictTo(normal);
+        assert S.equals(new HashSet<AbstractBounds>(Arrays.asList(normal)));
+    }
+
+    public void testNoIntersectionWrapped()
+    {
+        IPartitioner p = new OrderPreservingPartitioner();
+        Range node = new Range(new StringToken("z"), new StringToken("a"));
+        Bounds bounds;
+
+        bounds = new Bounds(new StringToken("m"), new StringToken("n"), p);
+        assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet());
+
+        bounds = new Bounds(new StringToken("b"), node.left, p);
+        assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet());
+    }
+
+    public void testSmallBoundsFullRange()
+    {
+        IPartitioner p = new OrderPreservingPartitioner();
+        Range node;
+        Bounds bounds = new Bounds(new StringToken("b"), new StringToken("c"), p);
+
+        node = new Range(new StringToken("d"), new StringToken("d"));
+        assert bounds.restrictTo(node).equals(new HashSet(Arrays.asList(bounds)));
+    }
+
+    public void testNoIntersectionUnwrapped()
+    {
+        IPartitioner p = new OrderPreservingPartitioner();
+        Token min = p.getMinimumToken();
+        Range node = new Range(new StringToken("m"), new StringToken("n"));
+        Bounds bounds;
+
+        bounds = new Bounds(new StringToken("z"), min, p);
+        assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet());
+
+        bounds = new Bounds(new StringToken("a"), node.left, p);
+        assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet());
+
+        bounds = new Bounds(min, new StringToken("b"), p);
+        assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet());
+    }
+}

Added: incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java?rev=915437&view=auto
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java (added)
+++ incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java Tue Feb 23 17:05:46 2010
@@ -0,0 +1,131 @@
+package org.apache.cassandra.dht;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.junit.Test;
+
+public class RangeIntersectionTest
+{
+    static void assertIntersection(Range one, Range two, Range ... ranges)
+    {
+        Set<Range> correct = Range.rangeSet(ranges);
+        Set<Range> result1 = one.intersectionWith(two);
+        assert result1.equals(correct) : String.format("%s != %s",
+                                                       StringUtils.join(result1, ","),
+                                                       StringUtils.join(correct, ","));
+        Set<Range> result2 = two.intersectionWith(one);
+        assert result2.equals(correct) : String.format("%s != %s",
+                                                       StringUtils.join(result2, ","),
+                                                       StringUtils.join(correct, ","));
+    }
+
+    private void assertNoIntersection(Range wraps1, Range nowrap3)
+    {
+        assertIntersection(wraps1, nowrap3);
+    }
+
+    @Test
+    public void testIntersectionWithAll()
+    {
+        Range all0 = new Range(new BigIntegerToken("0"), new BigIntegerToken("0"));
+        Range all10 = new Range(new BigIntegerToken("10"), new BigIntegerToken("10"));
+        Range all100 = new Range(new BigIntegerToken("100"), new BigIntegerToken("100"));
+        Range all1000 = new Range(new BigIntegerToken("1000"), new BigIntegerToken("1000"));
+        Range wraps = new Range(new BigIntegerToken("100"), new BigIntegerToken("10"));
+
+        assertIntersection(all0, wraps, wraps);
+        assertIntersection(all10, wraps, wraps);
+        assertIntersection(all100, wraps, wraps);
+        assertIntersection(all1000, wraps, wraps);
+    }
+
+    @Test
+    public void testIntersectionContains()
+    {
+        Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10"));
+        Range wraps2 = new Range(new BigIntegerToken("90"), new BigIntegerToken("20"));
+        Range wraps3 = new Range(new BigIntegerToken("90"), new BigIntegerToken("0"));
+        Range nowrap1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("110"));
+        Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("10"));
+        Range nowrap3 = new Range(new BigIntegerToken("0"), new BigIntegerToken("9"));
+
+        assertIntersection(wraps1, wraps2, wraps1);
+        assertIntersection(wraps3, wraps2, wraps3);
+
+        assertIntersection(wraps1, nowrap1, nowrap1);
+        assertIntersection(wraps1, nowrap2, nowrap2);
+        assertIntersection(nowrap2, nowrap3, nowrap3);
+
+        assertIntersection(wraps1, wraps1, wraps1);
+        assertIntersection(nowrap1, nowrap1, nowrap1);
+        assertIntersection(nowrap2, nowrap2, nowrap2);
+        assertIntersection(wraps3, wraps3, wraps3);
+    }
+
+    @Test
+    public void testNoIntersection()
+    {
+        Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10"));
+        Range wraps2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("0"));
+        Range nowrap1 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100"));
+        Range nowrap2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("200"));
+        Range nowrap3 = new Range(new BigIntegerToken("10"), new BigIntegerToken("100"));
+
+        assertNoIntersection(wraps1, nowrap3);
+        assertNoIntersection(wraps2, nowrap1);
+        assertNoIntersection(nowrap1, nowrap2);
+    }
+
+    @Test
+    public void testIntersectionOneWraps()
+    {
+        Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10"));
+        Range wraps2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("0"));
+        Range nowrap1 = new Range(new BigIntegerToken("0"), new BigIntegerToken("200"));
+        Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100"));
+
+        assertIntersection(wraps1,
+                           nowrap1,
+                           new Range(new BigIntegerToken("0"), new BigIntegerToken("10")),
+                           new Range(new BigIntegerToken("100"), new BigIntegerToken("200")));
+        assertIntersection(wraps2,
+                           nowrap1,
+                           new Range(new BigIntegerToken("100"), new BigIntegerToken("200")));
+        assertIntersection(wraps1,
+                           nowrap2,
+                           new Range(new BigIntegerToken("0"), new BigIntegerToken("10")));
+    }
+
+    @Test
+    public void testIntersectionTwoWraps()
+    {
+        Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("20"));
+        Range wraps2 = new Range(new BigIntegerToken("120"), new BigIntegerToken("90"));
+        Range wraps3 = new Range(new BigIntegerToken("120"), new BigIntegerToken("110"));
+        Range wraps4 = new Range(new BigIntegerToken("10"), new BigIntegerToken("0"));
+        Range wraps5 = new Range(new BigIntegerToken("10"), new BigIntegerToken("1"));
+        Range wraps6 = new Range(new BigIntegerToken("30"), new BigIntegerToken("10"));
+
+        assertIntersection(wraps1,
+                           wraps2,
+                           new Range(new BigIntegerToken("120"), new BigIntegerToken("20")));
+        assertIntersection(wraps1,
+                           wraps3,
+                           new Range(new BigIntegerToken("120"), new BigIntegerToken("20")),
+                           new Range(new BigIntegerToken("100"), new BigIntegerToken("110")));
+        assertIntersection(wraps1,
+                           wraps4,
+                           new Range(new BigIntegerToken("10"), new BigIntegerToken("20")),
+                           new Range(new BigIntegerToken("100"), new BigIntegerToken("0")));
+        assertIntersection(wraps1,
+                           wraps5,
+                           new Range(new BigIntegerToken("10"), new BigIntegerToken("20")),
+                           new Range(new BigIntegerToken("100"), new BigIntegerToken("1")));
+        assertIntersection(wraps1,
+                           wraps6,
+                           new Range(new BigIntegerToken("100"), new BigIntegerToken("10")));
+    }
+}