You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/06/18 17:23:12 UTC

[2/3] git commit: Support multiple ranges in SliceQueryFilter

Support multiple ranges in SliceQueryFilter

patch by dr-alves and slebresne; reviewed by vijay, dr-alves and slebresne for CASSANDRA-3885


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d1171ddc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d1171ddc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d1171ddc

Branch: refs/heads/trunk
Commit: d1171ddc10e75d34f7fb503accc718acc5720512
Parents: 8ea2d2a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jun 18 17:20:00 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Jun 18 17:21:56 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/cql/QueryProcessor.java   |    2 +-
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   19 +-
 .../cassandra/cql3/statements/SelectStatement.java |   12 +-
 .../cassandra/db/AbstractColumnContainer.java      |   14 +-
 .../cassandra/db/ArrayBackedSortedColumns.java     |  116 +++--
 .../apache/cassandra/db/AtomicSortedColumns.java   |   14 +-
 .../org/apache/cassandra/db/ISortedColumns.java    |   19 +-
 src/java/org/apache/cassandra/db/Memtable.java     |    4 +-
 .../org/apache/cassandra/db/RangeSliceCommand.java |    2 +-
 .../apache/cassandra/db/SliceFromReadCommand.java  |    2 +-
 .../cassandra/db/ThreadSafeSortedColumns.java      |   14 +-
 .../cassandra/db/TreeMapBackedSortedColumns.java   |   14 +-
 .../db/columniterator/IndexedSliceReader.java      |  394 +++++++++++----
 .../db/columniterator/SSTableNamesIterator.java    |    9 +-
 .../db/columniterator/SSTableSliceIterator.java    |   19 +-
 .../apache/cassandra/db/filter/ColumnSlice.java    |  186 +++++++
 .../apache/cassandra/db/filter/ExtendedFilter.java |    9 +-
 .../cassandra/db/filter/SliceQueryFilter.java      |   99 +++-
 .../apache/cassandra/io/sstable/IndexHelper.java   |   20 +-
 .../org/apache/cassandra/utils/ByteBufferUtil.java |    3 +
 .../serialization/1.2/db.RangeSliceCommand.bin     |  Bin 701 -> 717 bytes
 test/data/serialization/1.2/db.Row.bin             |  Bin 527 -> 527 bytes
 test/data/serialization/1.2/db.RowMutation.bin     |  Bin 3410 -> 3410 bytes
 .../serialization/1.2/db.SliceFromReadCommand.bin  |  Bin 361 -> 385 bytes
 test/data/serialization/1.2/gms.EndpointState.bin  |  Bin 110 -> 110 bytes
 .../serialization/1.2/service.TreeResponse.bin     |  Bin 930 -> 930 bytes
 test/data/serialization/1.2/utils.BloomFilter.bin  |  Bin 2500016 -> 2500016 bytes
 .../cassandra/db/ArrayBackedSortedColumnsTest.java |   12 +-
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |  418 ++++++++++++++-
 test/unit/org/apache/cassandra/db/TableTest.java   |    3 -
 .../cassandra/io/sstable/IndexHelperTest.java      |   20 +-
 32 files changed, 1132 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7064eb0..5bea5f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
    (CASSANDRA-4293)
  * fix Summary component and caches to use correct partitioner (CASSANDRA-4289)
  * stream compressed sstables directly with java nio (CASSANDRA-4297)
+ * Support multiple ranges in SliceQueryFilter (CASSANDRA-3885)
 
 
 1.1.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 101c21c..2be9090 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -394,7 +394,7 @@ public class QueryProcessor
     private static void validateSliceFilter(CFMetaData metadata, SliceQueryFilter range)
     throws InvalidRequestException
     {
-        validateSliceFilter(metadata, range.start, range.finish, range.reversed);
+        validateSliceFilter(metadata, range.start(), range.finish(), range.reversed);
     }
 
     private static void validateSliceFilter(CFMetaData metadata, ByteBuffer start, ByteBuffer finish, boolean reversed)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index c74be16..f1da676 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -88,16 +88,15 @@ public class QueryProcessor
     public static void validateSliceFilter(CFMetaData metadata, SliceQueryFilter range)
     throws InvalidRequestException
     {
-        validateSliceFilter(metadata, range.start, range.finish, range.reversed);
-    }
-
-    public static void validateSliceFilter(CFMetaData metadata, ByteBuffer start, ByteBuffer finish, boolean reversed)
-    throws InvalidRequestException
-    {
-        AbstractType<?> comparator = metadata.getComparatorFor(null);
-        Comparator<ByteBuffer> orderedComparator = reversed ? comparator.reverseComparator: comparator;
-        if (start.remaining() > 0 && finish.remaining() > 0 && orderedComparator.compare(start, finish) > 0)
-            throw new InvalidRequestException("Range finish must come after start in traversal order");
+        try
+        {
+            AbstractType<?> comparator = metadata.getComparatorFor(null);
+            ColumnSlice.validate(range.slices, comparator, range.reversed);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new InvalidRequestException(e.getMessage());
+        }
     }
 
     private static CqlResult processStatement(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 9fb9bd7..784389f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -195,20 +195,16 @@ public class SelectStatement implements CQLStatement
             ByteBuffer start = getRequestedBound(isReversed ? Bound.END : Bound.START, variables);
             ByteBuffer finish = getRequestedBound(isReversed ? Bound.START : Bound.END, variables);
 
+            SliceQueryFilter filter = new SliceQueryFilter(start, finish, isReversed, getLimit());
+            QueryProcessor.validateSliceFilter(cfDef.cfm, filter);
+
             // Note that we use the total limit for every key. This is
             // potentially inefficient, but then again, IN + LIMIT is not a
             // very sensible choice
             for (ByteBuffer key : keys)
             {
                 QueryProcessor.validateKey(key);
-                QueryProcessor.validateSliceFilter(cfDef.cfm, start, finish, isReversed);
-                commands.add(new SliceFromReadCommand(keyspace(),
-                                                      key,
-                                                      queryPath,
-                                                      start,
-                                                      finish,
-                                                      isReversed,
-                                                      getLimit()));
+                commands.add(new SliceFromReadCommand(keyspace(), key, queryPath, filter));
             }
         }
         // ...of a list of column names

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
index 9b04a5b..62ed353 100644
--- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
+++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -27,6 +27,7 @@ import com.google.common.base.Functions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.Allocator;
@@ -182,19 +183,14 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
         return columns.iterator();
     }
 
-    public Iterator<IColumn> reverseIterator()
+    public Iterator<IColumn> iterator(ColumnSlice[] slices)
     {
-        return columns.reverseIterator();
+        return columns.iterator(slices);
     }
 
-    public Iterator<IColumn> iterator(ByteBuffer start)
+    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
     {
-        return columns.iterator(start);
-    }
-
-    public Iterator<IColumn> reverseIterator(ByteBuffer start)
-    {
-        return columns.reverseIterator(start);
+        return columns.reverseIterator(slices);
     }
 
     public boolean hasExpiredTombstones(int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index 622d008..3bbf49b 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -21,9 +21,13 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.base.Function;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * A ISortedColumns backed by an ArrayList.
@@ -91,12 +95,9 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return reversed;
     }
 
-    private int compare(ByteBuffer name1, ByteBuffer name2)
+    private Comparator<ByteBuffer> internalComparator()
     {
-        if (reversed)
-            return comparator.reverseComparator.compare(name1, name2);
-        else
-            return comparator.compare(name1, name2);
+        return reversed ? comparator.reverseComparator : comparator;
     }
 
     public IColumn getColumn(ByteBuffer name)
@@ -124,7 +125,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         }
 
         // Fast path if inserting at the tail
-        int c = compare(columns.get(size() - 1).name(), column.name());
+        int c = internalComparator().compare(columns.get(size() - 1).name(), column.name());
         // note that we want an assertion here (see addColumn javadoc), but we also want that if
         // assertion are disabled, addColumn works correctly with unsorted input
         assert c <= 0 : "Added column does not sort as the " + (reversed ? "first" : "last") + " column";
@@ -170,22 +171,27 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         }
     }
 
+    private int binarySearch(ByteBuffer name)
+    {
+        return binarySearch(columns, internalComparator(), name, 0);
+    }
+
     /**
      * Simple binary search for a given column name.
      * The return value has the exact same meaning that the one of Collections.binarySearch().
      * (We don't use Collections.binarySearch() directly because it would require us to create
-     * a fake IColumn (as well an IColumn comparator) to do the search, which is ugly.
+     * a fake IColumn (as well as an IColumn comparator) to do the search, which is ugly.
      */
-    private int binarySearch(ByteBuffer name)
+    private static int binarySearch(List<IColumn> columns, Comparator<ByteBuffer> comparator, ByteBuffer name, int start)
     {
-        int low = 0;
-        int mid = size();
+        int low = start;
+        int mid = columns.size();
         int high = mid - 1;
         int result = -1;
         while (low <= high)
         {
             mid = (low + high) >> 1;
-            if ((result = compare(name, columns.get(mid).name())) > 0)
+            if ((result = comparator.compare(name, columns.get(mid).name())) > 0)
             {
                 low = mid + 1;
             }
@@ -208,14 +214,14 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
 
         IColumn[] copy = columns.toArray(new IColumn[size()]);
         int idx = 0;
-        Iterator<IColumn> other = reversed ? cm.reverseIterator() : cm.iterator();
+        Iterator<IColumn> other = reversed ? cm.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY) : cm.iterator();
         IColumn otherColumn = other.next();
 
         columns.clear();
 
         while (idx < copy.length && otherColumn != null)
         {
-            int c = compare(copy[idx].name(), otherColumn.name());
+            int c = internalComparator().compare(copy[idx].name(), otherColumn.name());
             if (c < 0)
             {
                 columns.add(copy[idx]);
@@ -298,56 +304,70 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
     @Override
     public Iterator<IColumn> iterator()
     {
-        return reversed ? reverseInternalIterator(size()) : columns.iterator();
+        return reversed ? Lists.reverse(columns).iterator() : columns.iterator();
     }
 
-    public Iterator<IColumn> reverseIterator()
+    public Iterator<IColumn> iterator(ColumnSlice[] slices)
     {
-        return reversed ? columns.iterator() : reverseInternalIterator(size());
+        return new SlicesIterator(columns, comparator, slices, reversed);
     }
 
-    public Iterator<IColumn> iterator(ByteBuffer start)
+    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
     {
-        int idx = binarySearch(start);
-        if (idx < 0)
-            idx = -idx - 1;
-        else if (reversed)
-            // listIterator.previous() doesn't return the current element at first but the previous one
-            idx++;
-        return reversed ? reverseInternalIterator(idx) : columns.listIterator(idx);
+        return new SlicesIterator(columns, comparator, slices, !reversed);
     }
 
-    public Iterator<IColumn> reverseIterator(ByteBuffer start)
+    private static class SlicesIterator extends AbstractIterator<IColumn>
     {
-        int idx = binarySearch(start);
-        if (idx < 0)
-            idx = -idx - 1;
-        else if (!reversed)
-            // listIterator.previous() doesn't return the current element at first but the previous one
-            idx++;
-        return reversed ? columns.listIterator(idx) : reverseInternalIterator(idx);
-    }
+        private final List<IColumn> list;
+        private final ColumnSlice[] slices;
+        private final Comparator<ByteBuffer> comparator;
 
-    private Iterator<IColumn> reverseInternalIterator(int idx)
-    {
-        final ListIterator<IColumn> iter = columns.listIterator(idx);
-        return new Iterator<IColumn>()
+        private int idx = 0;
+        private int previousSliceEnd = 0;
+        private Iterator<IColumn> currentSlice;
+
+        public SlicesIterator(List<IColumn> list, AbstractType<?> comparator, ColumnSlice[] slices, boolean reversed)
         {
-            public boolean hasNext()
-            {
-                return iter.hasPrevious();
-            }
+            this.list = reversed ? Lists.reverse(list) : list;
+            this.slices = slices;
+            this.comparator = reversed ? comparator.reverseComparator : comparator;
+        }
 
-            public IColumn next()
+        protected IColumn computeNext()
+        {
+            if (currentSlice == null)
             {
-                return iter.previous();
+                if (idx >= slices.length)
+                    return endOfData();
+
+                ColumnSlice slice = slices[idx++];
+                // The first idx to include
+                int startIdx = slice.start.remaining() == 0 ? 0 : binarySearch(list, comparator, slice.start, previousSliceEnd);
+                if (startIdx < 0)
+                    startIdx = -startIdx - 1;
+
+                // The first idx to exclude
+                int finishIdx = slice.finish.remaining() == 0 ? list.size() - 1 : binarySearch(list, comparator, slice.finish, previousSliceEnd);
+                if (finishIdx >= 0)
+                    finishIdx++;
+                else
+                    finishIdx = -finishIdx - 1;
+
+                if (startIdx == 0 && finishIdx == list.size())
+                    currentSlice = list.iterator();
+                else
+                    currentSlice = list.subList(startIdx, finishIdx).iterator();
+
+                previousSliceEnd = finishIdx > 0 ? finishIdx - 1 : 0;
             }
 
-            public void remove()
-            {
-                iter.remove();
-            }
-        };
+            if (currentSlice.hasNext())
+                return currentSlice.next();
+
+            currentSlice = null;
+            return computeNext();
+        }
     }
 
     private class ReverseSortedCollection extends AbstractCollection<IColumn>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 16d2f47..a85b41e 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.base.Function;
 import edu.stanford.ppl.concurrent.SnapTreeMap;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -266,19 +267,14 @@ public class AtomicSortedColumns implements ISortedColumns
         return getSortedColumns().iterator();
     }
 
-    public Iterator<IColumn> reverseIterator()
+    public Iterator<IColumn> iterator(ColumnSlice[] slices)
     {
-        return getReverseSortedColumns().iterator();
+        return new ColumnSlice.NavigableMapIterator(ref.get().map, slices);
     }
 
-    public Iterator<IColumn> iterator(ByteBuffer start)
+    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
     {
-        return ref.get().map.tailMap(start).values().iterator();
-    }
-
-    public Iterator<IColumn> reverseIterator(ByteBuffer start)
-    {
-        return ref.get().map.descendingMap().tailMap(start).values().iterator();
+        return new ColumnSlice.NavigableMapIterator(ref.get().map.descendingMap(), slices);
     }
 
     public boolean isInsertReversed()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index 73a62c2..fd209ea 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -25,6 +25,7 @@ import java.util.SortedSet;
 
 import com.google.common.base.Function;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.Allocator;
@@ -128,22 +129,16 @@ public interface ISortedColumns extends IIterableColumns
     public boolean isEmpty();
 
     /**
-     * Returns an iterator that iterates over the columns of this map in
-     * reverse order.
+     * Returns an iterator over the columns of this map that returns only the matching @param slices.
+     * The provided slices must be in order and must be non-overlapping.
      */
-    public Iterator<IColumn> reverseIterator();
+    public Iterator<IColumn> iterator(ColumnSlice[] slices);
 
     /**
-     * Returns an iterator over the columns of this map starting from the
-     * first column whose name is equal or greater than @param start.
+     * Returns a reversed iterator over the columns of this map that returns only the matching @param slices.
+     * The provided slices must be in reversed order and must be non-overlapping.
      */
-    public Iterator<IColumn> iterator(ByteBuffer start);
-
-    /**
-     * Returns a reversed iterator over the columns of this map starting from
-     * the last column whose name is equal or lesser than @param start.
-     */
-    public Iterator<IColumn> reverseIterator(ByteBuffer start);
+    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices);
 
     /**
      * Returns if this map only support inserts in reverse order.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index ccb4589..0f9c2ed 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -369,9 +369,7 @@ public class Memtable
     public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)
     {
         assert cf != null;
-        final Iterator<IColumn> filteredIter = filter.reversed
-                                             ? (filter.start.remaining() == 0 ? cf.reverseIterator() : cf.reverseIterator(filter.start))
-                                             : cf.iterator(filter.start);
+        final Iterator<IColumn> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
 
         return new AbstractColumnIterator()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index ccc87e8..c0054ea 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -183,7 +183,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         else
         {
             SliceQueryFilter sqf = (SliceQueryFilter)predicate;
-            sp.setSlice_range(new SliceRange(sqf.start, sqf.finish, sqf.reversed, sqf.count));
+            sp.setSlice_range(new SliceRange(sqf.start(), sqf.finish(), sqf.reversed, sqf.count));
         }
         return sp;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 1f6a2c9..b9d8a17 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -86,7 +86,7 @@ public class SliceFromReadCommand extends ReadCommand
             // columns, only l/t end up live after reconciliation. So for next
             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
             int retryCount = liveColumnsInRow == 0 ? count + 1 : ((count * count) / liveColumnsInRow) + 1;
-            SliceQueryFilter newFilter = new SliceQueryFilter(filter.start, filter.finish, filter.reversed, retryCount);
+            SliceQueryFilter newFilter = new SliceQueryFilter(filter.slices, filter.reversed, retryCount);
             return new RetriedSliceFromReadCommand(table, key, queryPath, newFilter, getOriginalRequestedCount());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
index 84faf14..beb33ac 100644
--- a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 
 import com.google.common.base.Function;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -167,18 +168,13 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
         return map.values().iterator();
     }
 
-    public Iterator<IColumn> reverseIterator()
+    public Iterator<IColumn> iterator(ColumnSlice[] slices)
     {
-        return getReverseSortedColumns().iterator();
+        return new ColumnSlice.NavigableMapIterator(map, slices);
     }
 
-    public Iterator<IColumn> iterator(ByteBuffer start)
+    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
     {
-        return map.tailMap(start).values().iterator();
-    }
-
-    public Iterator<IColumn> reverseIterator(ByteBuffer start)
-    {
-        return map.descendingMap().tailMap(start).values().iterator();
+        return new ColumnSlice.NavigableMapIterator(map.descendingMap(), slices);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
index 9895bc4..51779e3 100644
--- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
@@ -26,6 +26,7 @@ import java.util.TreeMap;
 
 import com.google.common.base.Function;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
@@ -185,18 +186,13 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return map.values().iterator();
     }
 
-    public Iterator<IColumn> reverseIterator()
+    public Iterator<IColumn> iterator(ColumnSlice[] slices)
     {
-        return getReverseSortedColumns().iterator();
+        return new ColumnSlice.NavigableMapIterator(map, slices);
     }
 
-    public Iterator<IColumn> iterator(ByteBuffer start)
+    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
     {
-        return map.tailMap(start).values().iterator();
-    }
-
-    public Iterator<IColumn> reverseIterator(ByteBuffer start)
-    {
-        return map.descendingMap().tailMap(start).values().iterator();
+        return new ColumnSlice.NavigableMapIterator(map.descendingMap(), slices);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index aee4f05..d951891 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.AbstractIterator;
@@ -31,17 +32,19 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionInfo;
 import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 
 /**
- *  This is a reader that finds the block for a starting column and returns
- *  blocks before/after it for each next call. This function assumes that
- *  the CF is sorted by name and exploits the name index.
+ * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call.
+ * This function assumes that the CF is sorted by name and exploits the name index.
  */
 class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
 {
@@ -51,21 +54,24 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
     private final List<IndexHelper.IndexInfo> indexes;
     private final FileDataInput originalInput;
     private FileDataInput file;
-    private final ByteBuffer startColumn;
-    private final ByteBuffer finishColumn;
     private final boolean reversed;
-
+    private final ColumnSlice[] slices;
     private final BlockFetcher fetcher;
     private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>();
     private final AbstractType<?> comparator;
 
-    public IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
+    /**
+     * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in
+     * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of
+     * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also
+     * assumes that validation has been performed in terms of intervals (no overlapping intervals).
+     */
+    public IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed)
     {
         this.sstable = sstable;
         this.originalInput = input;
-        this.startColumn = startColumn;
-        this.finishColumn = finishColumn;
         this.reversed = reversed;
+        this.slices = slices;
         this.comparator = sstable.metadata.comparator;
 
         try
@@ -84,7 +90,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 {
                     this.emptyColumnFamily = ColumnFamily.create(sstable.metadata);
                     emptyColumnFamily.delete(indexEntry.deletionInfo());
-                    fetcher = new IndexedBlockFetcher(indexEntry);
+                    fetcher = new IndexedBlockFetcher(indexEntry.position);
                 }
             }
             else
@@ -94,7 +100,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 this.indexes = IndexHelper.deserializeIndex(file);
                 this.emptyColumnFamily = ColumnFamily.create(sstable.metadata);
                 emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version));
-                fetcher = indexes.isEmpty() ? new SimpleBlockFetcher() : new IndexedBlockFetcher();
+                fetcher = indexes.isEmpty()
+                        ? new SimpleBlockFetcher()
+                        : new IndexedBlockFetcher(file.getFilePointer() + 4); // We still have the column count to
+                                                                              // skip to get the basePosition
             }
         }
         catch (IOException e)
@@ -104,6 +113,9 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         }
     }
 
+    /**
+     * Sets the seek position to the start of the row for column scanning.
+     */
     private void setToRowStart(SSTableReader reader, RowIndexEntry indexEntry, FileDataInput input) throws IOException
     {
         if (input == null)
@@ -129,39 +141,26 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         throw new UnsupportedOperationException();
     }
 
-    private boolean isColumnNeeded(OnDiskAtom column)
-    {
-        if (startColumn.remaining() == 0 && finishColumn.remaining() == 0)
-            return true;
-        else if (startColumn.remaining() == 0 && !reversed)
-            return comparator.compare(column.name(), finishColumn) <= 0;
-        else if (startColumn.remaining() == 0 && reversed)
-            return comparator.compare(column.name(), finishColumn) >= 0;
-        else if (finishColumn.remaining() == 0 && !reversed)
-            return comparator.compare(column.name(), startColumn) >= 0;
-        else if (finishColumn.remaining() == 0 && reversed)
-            return comparator.compare(column.name(), startColumn) <= 0;
-        else if (!reversed)
-            return comparator.compare(column.name(), startColumn) >= 0 && comparator.compare(column.name(), finishColumn) <= 0;
-        else // if reversed
-            return comparator.compare(column.name(), startColumn) <= 0 && comparator.compare(column.name(), finishColumn) >= 0;
-    }
-
     protected OnDiskAtom computeNext()
     {
         while (true)
         {
             OnDiskAtom column = blockColumns.poll();
-            if (column != null && isColumnNeeded(column))
-                return column;
-            try
+            if (column == null)
             {
-                if (column == null && !fetcher.getNextBlock())
-                    return endOfData();
+                try
+                {
+                    if (!fetcher.fetchMoreData())
+                        return endOfData();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
-            catch (IOException e)
+            else
             {
-                throw new RuntimeException(e);
+                return column;
             }
         }
     }
@@ -172,53 +171,184 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             file.close();
     }
 
-    interface BlockFetcher
+    protected void addColumn(OnDiskAtom col)
     {
-        public boolean getNextBlock() throws IOException;
+        if (reversed)
+            blockColumns.addFirst(col);
+        else
+            blockColumns.addLast(col);
+    }
+
+    private abstract class BlockFetcher
+    {
+        protected int currentSliceIdx;
+
+        protected BlockFetcher(int sliceIdx)
+        {
+            this.currentSliceIdx = sliceIdx;
+        }
+
+        /*
+         * Return the smallest key selected by the current ColumnSlice.
+         */
+        protected ByteBuffer currentStart()
+        {
+            return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start;
+        }
+
+        /*
+         * Return the biggest key selected by the current ColumnSlice.
+         */
+        protected ByteBuffer currentFinish()
+        {
+            return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish;
+        }
+
+        protected abstract boolean setNextSlice();
+
+        protected abstract boolean fetchMoreData() throws IOException;
+
+        protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
+        {
+            return isBeforeSliceStart(column.name());
+        }
+
+        protected boolean isBeforeSliceStart(ByteBuffer name)
+        {
+            ByteBuffer start = currentStart();
+            return start.remaining() != 0 && comparator.compare(name, start) < 0;
+        }
+
+        protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
+        {
+            ByteBuffer finish = currentFinish();
+            return finish.remaining() == 0 || comparator.compare(column.name(), finish) <= 0;
+        }
+
+        protected boolean isAfterSliceFinish(ByteBuffer name)
+        {
+            ByteBuffer finish = currentFinish();
+            return finish.remaining() != 0 && comparator.compare(name, finish) > 0;
+        }
     }
 
-    private class IndexedBlockFetcher implements BlockFetcher
+    private class IndexedBlockFetcher extends BlockFetcher
     {
+        // where this row starts
         private final long basePosition;
-        private int curRangeIndex;
 
-        IndexedBlockFetcher() throws IOException
+        // the index entry for the next block to deserialize
+        private int nextIndexIdx = -1;
+
+        // index of the last block we've read from disk;
+        private int lastDeserializedBlock = -1;
+
+        // For reversed, keep columns at the beginning of the last deserialized block that
+        // may still match a slice
+        private final Deque<OnDiskAtom> prefetched;
+
+        public IndexedBlockFetcher(long basePosition)
         {
-            file.readInt(); // column count
-            basePosition = file.getFilePointer();
-            curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
+            super(-1);
+            this.basePosition = basePosition;
+            this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
+            setNextSlice();
         }
 
-        IndexedBlockFetcher(RowIndexEntry indexEntry)
+        protected boolean setNextSlice()
         {
-            basePosition = indexEntry.position;
-            curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
+            while (++currentSliceIdx < slices.length)
+            {
+                nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx);
+                if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
+                    // no index block for that slice
+                    continue;
+
+                // Check if we can exclude this slice entirely from the index
+                IndexInfo info = indexes.get(nextIndexIdx);
+                if (reversed)
+                {
+                    if (!isBeforeSliceStart(info.lastName))
+                        return true;
+                }
+                else
+                {
+                    if (!isAfterSliceFinish(info.firstName))
+                        return true;
+                }
+            }
+            nextIndexIdx = -1;
+            return false;
         }
 
-        public boolean getNextBlock() throws IOException
+        protected boolean hasMoreSlice()
         {
-            if (curRangeIndex < 0 || curRangeIndex >= indexes.size())
-                return false;
+            return currentSliceIdx < slices.length;
+        }
 
-            /* seek to the correct offset to the data, and calculate the data size */
-            IndexHelper.IndexInfo curColPosition = indexes.get(curRangeIndex);
+        protected boolean fetchMoreData() throws IOException
+        {
+            if (!hasMoreSlice())
+                return false;
 
-            /* see if this read is really necessary. */
-            if (reversed)
+            // If we read blocks in reversed disk order, we may have columns from the previous block to handle.
+            // Note that prefetched keeps columns in reversed disk order.
+            if (reversed && !prefetched.isEmpty())
             {
-                if ((finishColumn.remaining() > 0 && comparator.compare(finishColumn, curColPosition.lastName) > 0) ||
-                    (startColumn.remaining() > 0 && comparator.compare(startColumn, curColPosition.firstName) < 0))
-                    return false;
+                boolean gotSome = false;
+                // Avoids some comparison when we know it's not useful
+                boolean inSlice = false;
+
+                OnDiskAtom prefetchedCol;
+                while ((prefetchedCol = prefetched.peek() ) != null)
+                {
+                    // col is before slice, we update the slice
+                    if (isColumnBeforeSliceStart(prefetchedCol))
+                    {
+                        inSlice = false;
+                        if (!setNextSlice())
+                            return false;
+                    }
+                    // col is within slice, all columns
+                    // (we go in reverse, so as soon as we are in a slice, no need to check
+                    // we're after the slice until we change slice)
+                    else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol))
+                    {
+                        blockColumns.addLast(prefetched.poll());
+                        gotSome = true;
+                        inSlice = true;
+                    }
+                    // if col is after slice, ignore
+                    else
+                    {
+                        prefetched.poll();
+                    }
+                }
+                if (gotSome)
+                    return true;
             }
-            else
+            return getNextBlock();
+        }
+
+        private boolean getNextBlock() throws IOException
+        {
+            if (lastDeserializedBlock == nextIndexIdx)
             {
-                if ((startColumn.remaining() > 0 && comparator.compare(startColumn, curColPosition.lastName) > 0) ||
-                    (finishColumn.remaining() > 0 && comparator.compare(finishColumn, curColPosition.firstName) < 0))
-                    return false;
+                if (reversed)
+                    nextIndexIdx--;
+                else
+                    nextIndexIdx++;
             }
+            lastDeserializedBlock = nextIndexIdx;
 
-            boolean outOfBounds = false;
-            long positionToSeek = basePosition + curColPosition.offset;
+            // Are we done?
+            if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size())
+                return false;
+
+            IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
+
+            /* seek to the correct offset to the data, and calculate the data size */
+            long positionToSeek = basePosition + currentIndex.offset;
 
             // With new promoted indexes, our first seek in the data file will happen at that point.
             if (file == null)
@@ -227,55 +357,133 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             OnDiskAtom.Serializer atomSerializer = emptyColumnFamily.getOnDiskSerializer();
             file.seek(positionToSeek);
             FileMark mark = file.mark();
-            while (file.bytesPastMark(mark) < curColPosition.width && !outOfBounds)
+
+            // We remenber when we are whithin a slice to avoid some comparison
+            boolean inSlice = false;
+
+            // scan from index start
+            OnDiskAtom column = null;
+            while (file.bytesPastMark(mark) < currentIndex.width)
             {
-                OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
-                if (reversed)
-                    blockColumns.addFirst(column);
+                // Only fetch a new column if we haven't dealt with the previous one.
+                if (column == null)
+                    column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+
+                // col is before slice
+                // (If in slice, don't bother checking that until we change slice)
+                if (!inSlice && isColumnBeforeSliceStart(column))
+                {
+                    if (reversed)
+                    {
+                        // the next slice select columns that are before the current one, so it may
+                        // match this column, so keep it around.
+                        prefetched.addFirst(column);
+                    }
+                    column = null;
+                }
+                // col is within slice
+                else if (isColumnBeforeSliceFinish(column))
+                {
+                    inSlice = true;
+                    addColumn(column);
+                    column = null;
+                }
+                // col is after slice.
                 else
-                    blockColumns.addLast(column);
+                {
+                    // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice.
+                    // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous
+                    // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first
+                    // columns of the block were not part of the current slice, i.e. if we have columns in prefetched.
+                    if (reversed && prefetched.isEmpty())
+                        break;
 
-                /* see if we can stop seeking. */
-                if (!reversed && finishColumn.remaining() > 0)
-                    outOfBounds = comparator.compare(column.name(), finishColumn) >= 0;
-                else if (reversed && startColumn.remaining() > 0)
-                    outOfBounds = comparator.compare(column.name(), startColumn) >= 0;
-            }
+                    if (!setNextSlice())
+                        break;
 
-            if (reversed)
-                curRangeIndex--;
-            else
-                curRangeIndex++;
+                    inSlice = false;
+
+                    // The next index block now corresponds to the first block that may have columns for the newly set slice.
+                    // So if it's different from the current block, we're done with this block. And in that case, we know
+                    // that our prefetched columns won't match.
+                    if (nextIndexIdx != lastDeserializedBlock)
+                    {
+                        if (reversed)
+                            prefetched.clear();
+                        break;
+                    }
+
+                    // Even if the next slice may have column in this blocks, if we're reversed, those columns have been
+                    // prefetched and we're done with that block
+                    if (reversed)
+                        break;
+
+                    // otherwise, we will deal with that column at the next iteration
+                }
+            }
             return true;
         }
     }
 
-    private class SimpleBlockFetcher implements BlockFetcher
+    private class SimpleBlockFetcher extends BlockFetcher
     {
-        private SimpleBlockFetcher() throws IOException
+        public SimpleBlockFetcher() throws IOException
         {
+            // Since we have to deserialize in order and will read all slices might as well reverse the slices and
+            // behave as if it was not reversed
+            super(reversed ? slices.length - 1 : 0);
+
+            // We remenber when we are whithin a slice to avoid some comparison
+            boolean inSlice = false;
+
             OnDiskAtom.Serializer atomSerializer = emptyColumnFamily.getOnDiskSerializer();
             int columns = file.readInt();
+
             for (int i = 0; i < columns; i++)
             {
                 OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
-                if (reversed)
-                    blockColumns.addFirst(column);
+
+                // col is before slice
+                // (If in slice, don't bother checking that until we change slice)
+                if (!inSlice && isColumnBeforeSliceStart(column))
+                    continue;
+
+                // col is within slice
+                if (isColumnBeforeSliceFinish(column))
+                {
+                    inSlice = true;
+                    addColumn(column);
+                }
+                // col is after slice. more slices?
                 else
-                    blockColumns.addLast(column);
-
-                /* see if we can stop seeking. */
-                boolean outOfBounds = false;
-                if (!reversed && finishColumn.remaining() > 0)
-                    outOfBounds = comparator.compare(column.name(), finishColumn) >= 0;
-                else if (reversed && startColumn.remaining() > 0)
-                    outOfBounds = comparator.compare(column.name(), startColumn) >= 0;
-                if (outOfBounds)
-                    break;
+                {
+                    inSlice = false;
+                    if (!setNextSlice())
+                        break;
+                }
             }
         }
 
-        public boolean getNextBlock() throws IOException
+        protected boolean setNextSlice()
+        {
+            if (reversed)
+            {
+                if (currentSliceIdx <= 0)
+                    return false;
+
+                currentSliceIdx--;
+            }
+            else
+            {
+                if (currentSliceIdx >= slices.length - 1)
+                    return false;
+
+                currentSliceIdx++;
+            }
+            return true;
+        }
+
+        protected boolean fetchMoreData()
         {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index 9030f0e..6a8cb33 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -222,16 +222,19 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
     {
         /* get the various column ranges we have to read */
         AbstractType<?> comparator = metadata.comparator;
-        SortedSet<IndexHelper.IndexInfo> ranges = new TreeSet<IndexHelper.IndexInfo>(IndexHelper.getComparator(comparator, false));
+        List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>();
+        int lastIndexIdx = -1;
         for (ByteBuffer name : filteredColumnNames)
         {
-            int index = IndexHelper.indexFor(name, indexList, comparator, false);
+            int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx);
             if (index == indexList.size())
                 continue;
             IndexHelper.IndexInfo indexInfo = indexList.get(index);
-            if (comparator.compare(name, indexInfo.firstName) < 0)
+            // Check the index block does contain the column names and that we haven't inserted this block yet.
+            if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx)
                 continue;
             ranges.add(indexInfo);
+            lastIndexIdx = index;
         }
 
         if (ranges.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
index a45c94a..da72eb9 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
@@ -19,12 +19,15 @@ package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.utils.Pair;
 
 /**
  *  A Column Iterator over SSTable
@@ -34,11 +37,11 @@ public class SSTableSliceIterator implements OnDiskAtomIterator
     private final OnDiskAtomIterator reader;
     private final DecoratedKey key;
 
-    public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
+    public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice[] slices, boolean reversed)
     {
         this.key = key;
         RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
-        this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, startColumn, finishColumn, reversed);
+        this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, slices, reversed);
     }
 
     /**
@@ -53,17 +56,17 @@ public class SSTableSliceIterator implements OnDiskAtomIterator
      * @param finishColumn The end of the slice
      * @param reversed Results are returned in reverse order iff reversed is true.
      */
-    public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed, RowIndexEntry indexEntry)
+    public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry)
     {
         this.key = key;
-        reader = createReader(sstable, indexEntry, file, startColumn, finishColumn, reversed);
+        reader = createReader(sstable, indexEntry, file, slices, reversed);
     }
 
-    private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
+    private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed)
     {
-        return startColumn.remaining() == 0 && !reversed
-                 ? new SimpleSliceReader(sstable, indexEntry, file, finishColumn)
-                 : new IndexedSliceReader(sstable, indexEntry, file, startColumn, finishColumn, reversed);
+        return slices.length == 1 && slices[0].start.remaining() == 0 && !reversed
+             ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish)
+             : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed);
     }
 
     public DecoratedKey getKey()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
new file mode 100644
index 0000000..2a295eb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.filter;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ColumnSlice
+{
+    public static final Serializer serializer = new Serializer();
+
+    public static final ColumnSlice ALL_COLUMNS = new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    public static final ColumnSlice[] ALL_COLUMNS_ARRAY = new ColumnSlice[]{ ALL_COLUMNS };
+
+    public final ByteBuffer start;
+    public final ByteBuffer finish;
+
+    public ColumnSlice(ByteBuffer start, ByteBuffer finish)
+    {
+        assert start != null && finish != null;
+        this.start = start;
+        this.finish = finish;
+    }
+
+    /**
+     * Validate an array of column slices.
+     * To be valid, the slices must be sorted and non-overlapping and each slice must be valid.
+     *
+     * @throws IllegalArgumentException if the input slices are not valid.
+     */
+    public static void validate(ColumnSlice[] slices, AbstractType<?> comparator, boolean reversed)
+    {
+        for (int i = 0; i < slices.length; i++)
+        {
+            ColumnSlice slice = slices[i];
+            validate(slice, comparator, reversed);
+            if (i > 0)
+            {
+                if (slices[i - 1].finish.remaining() == 0 || slice.start.remaining() == 0)
+                    throw new IllegalArgumentException("Invalid column slices: slices must be sorted and non-overlapping");
+
+                int cmp = comparator.compare(slices[i -1].finish, slice.start);
+                if (reversed ? cmp <= 0 : cmp >= 0)
+                    throw new IllegalArgumentException("Invalid column slices: slices must be sorted and non-overlapping");
+            }
+        }
+    }
+
+    /**
+     * Validate a column slices.
+     * To be valid, the slice start must sort before the slice end.
+     *
+     * @throws IllegalArgumentException if the slice is not valid.
+     */
+    public static void validate(ColumnSlice slice, AbstractType<?> comparator, boolean reversed)
+    {
+        Comparator<ByteBuffer> orderedComparator = reversed ? comparator.reverseComparator : comparator;
+        if (slice.start.remaining() > 0 && slice.finish.remaining() > 0 && orderedComparator.compare(slice.start, slice.finish) > 0)
+            throw new IllegalArgumentException("Slice finish must come after start in traversal order");
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        int hashCode = 31 + start.hashCode();
+        return 31*hashCode + finish.hashCode();
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof ColumnSlice))
+            return false;
+        ColumnSlice that = (ColumnSlice)o;
+        return start.equals(that.start) && finish.equals(that.finish);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "[" + ByteBufferUtil.bytesToHex(start) + ", " + ByteBufferUtil.bytesToHex(finish) + "]";
+    }
+
+    public static class Serializer implements IVersionedSerializer<ColumnSlice>
+    {
+        public void serialize(ColumnSlice cs, DataOutput dos, int version) throws IOException
+        {
+            ByteBufferUtil.writeWithShortLength(cs.start, dos);
+            ByteBufferUtil.writeWithShortLength(cs.finish, dos);
+        }
+
+        public ColumnSlice deserialize(DataInput dis, int version) throws IOException
+        {
+            ByteBuffer start = ByteBufferUtil.readWithShortLength(dis);
+            ByteBuffer finish = ByteBufferUtil.readWithShortLength(dis);
+            return new ColumnSlice(start, finish);
+        }
+
+        public long serializedSize(ColumnSlice cs, int version)
+        {
+            TypeSizes sizes = TypeSizes.NATIVE;
+
+            int startSize = cs.start.remaining();
+            int finishSize = cs.finish.remaining();
+
+            int size = 0;
+            size += sizes.sizeof((short) startSize) + startSize;
+            size += sizes.sizeof((short) finishSize) + finishSize;
+            return size;
+        }
+    }
+
+    public static class NavigableMapIterator extends AbstractIterator<IColumn>
+    {
+        private final NavigableMap<ByteBuffer, IColumn> map;
+        private final ColumnSlice[] slices;
+
+        private int idx = 0;
+        private Iterator<IColumn> currentSlice;
+
+        public NavigableMapIterator(NavigableMap<ByteBuffer, IColumn> map, ColumnSlice[] slices)
+        {
+            this.map = map;
+            this.slices = slices;
+        }
+
+        protected IColumn computeNext()
+        {
+            if (currentSlice == null)
+            {
+                if (idx >= slices.length)
+                    return endOfData();
+
+                ColumnSlice slice = slices[idx++];
+                // Note: we specialize the case of start == "" and finish = "" because it is slightly more efficient, but also they have a specific
+                // meaning (namely, they always extend to the beginning/end of the range).
+                if (slice.start.remaining() == 0)
+                {
+                    if (slice.finish.remaining() == 0)
+                        currentSlice = map.values().iterator();
+                    else
+                        currentSlice = map.headMap(slice.finish, true).values().iterator();
+                }
+                else if (slice.finish.remaining() == 0)
+                {
+                    currentSlice = map.tailMap(slice.start, true).values().iterator();
+                }
+                else
+                {
+                    currentSlice = map.subMap(slice.start, true, slice.finish, true).values().iterator();
+                }
+            }
+
+            if (currentSlice.hasNext())
+                return currentSlice.next();
+
+            currentSlice = null;
+            return computeNext();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 497e074..b3736e8 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -70,7 +70,7 @@ public abstract class ExtendedFilter
         this.isPaging = isPaging;
         if (maxIsColumns)
             originalFilter.updateColumnsLimit(maxResults);
-        if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish.remaining() != 0))
+        if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish().remaining() != 0))
             throw new IllegalArgumentException("Cross-row paging is only supported for SliceQueryFilter having an empty finish column");
     }
 
@@ -92,7 +92,7 @@ public abstract class ExtendedFilter
     {
         // As soon as we'd done our first call, we want to reset the start column if we're paging
         if (isPaging)
-            ((SliceQueryFilter)initialFilter()).start = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            ((SliceQueryFilter)initialFilter()).setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER);
 
         if (!maxIsColumns)
             return;
@@ -212,8 +212,9 @@ public abstract class ExtendedFilter
 
             SliceQueryFilter filter = (SliceQueryFilter)originalFilter;
             // Check if we've fetch the whole row
-            if (filter.start.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-             && filter.finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            if (filter.slices.length == 1
+             && filter.start().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+             && filter.finish().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
              && filter.count == Integer.MAX_VALUE)
                 return false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index ebf2c3c..adf4204 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.filter;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SliceQueryFilter implements IFilter
@@ -43,15 +45,22 @@ public class SliceQueryFilter implements IFilter
     private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
     public static final Serializer serializer = new Serializer();
 
-    public volatile ByteBuffer start;
-    public volatile ByteBuffer finish;
+    public final ColumnSlice[] slices;
     public final boolean reversed;
     public volatile int count;
 
     public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
     {
-        this.start = start;
-        this.finish = finish;
+        this(new ColumnSlice[] { new ColumnSlice(start, finish) }, reversed, count);
+    }
+
+    /**
+     * Constructor that accepts multiple slices. All slices are assumed to be in the same direction (forward or
+     * reversed).
+     */
+    public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count)
+    {
+        this.slices = slices;
         this.reversed = reversed;
         this.count = count;
     }
@@ -63,12 +72,12 @@ public class SliceQueryFilter implements IFilter
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
     {
-        return new SSTableSliceIterator(sstable, key, start, finish, reversed);
+        return new SSTableSliceIterator(sstable, key, slices, reversed);
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
-        return new SSTableSliceIterator(sstable, file, key, start, finish, reversed, indexEntry);
+        return new SSTableSliceIterator(sstable, file, key, slices, reversed, indexEntry);
     }
 
     public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
@@ -92,13 +101,13 @@ public class SliceQueryFilter implements IFilter
         while (subcolumns.hasNext())
         {
             IColumn column = subcolumns.next();
-            if (comparator.compare(column.name(), start) >= 0)
+            if (comparator.compare(column.name(), start()) >= 0)
             {
                 subcolumns = Iterators.concat(Iterators.singletonIterator(column), subcolumns);
                 break;
             }
         }
-        // subcolumns is either empty now, or has been redefined in the loop above.  either is ok.
+        // subcolumns is either empty now, or has been redefined in the loop above. either is ok.
         collectReducedColumns(scFiltered, subcolumns, gcBefore);
         return scFiltered;
     }
@@ -123,11 +132,6 @@ public class SliceQueryFilter implements IFilter
                 logger.debug(String.format("collecting %s of %s: %s",
                                            liveColumns, count, column.getString(comparator)));
 
-            if (finish.remaining() > 0
-                && ((!reversed && comparator.compare(column.name(), finish) > 0))
-                    || (reversed && comparator.compare(column.name(), finish) < 0))
-                break;
-
             // only count live columns towards the `count` criteria
             if (column.isLive()
                 && (!container.deletionInfo().isDeleted(column)))
@@ -141,13 +145,26 @@ public class SliceQueryFilter implements IFilter
         }
     }
 
+    public ByteBuffer start()
+    {
+        return this.slices[0].start;
+    }
+
+    public ByteBuffer finish()
+    {
+        return this.slices[slices.length - 1].finish;
+    }
+
+    public void setStart(ByteBuffer start)
+    {
+        assert slices.length == 1;
+        this.slices[0] = new ColumnSlice(start, this.slices[0].finish);
+    }
+
     @Override
-    public String toString() {
-        return getClass().getSimpleName() + "(" +
-               "start=" + start +
-               ", finish=" + finish +
-               ", reversed=" + reversed +
-               ", count=" + count + "]";
+    public String toString()
+    {
+        return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + "]";
     }
 
     public boolean isReversed()
@@ -164,19 +181,38 @@ public class SliceQueryFilter implements IFilter
     {
         public void serialize(SliceQueryFilter f, DataOutput dos, int version) throws IOException
         {
-            ByteBufferUtil.writeWithShortLength(f.start, dos);
-            ByteBufferUtil.writeWithShortLength(f.finish, dos);
+            if (version < MessagingService.VERSION_12)
+            {
+                // It's kind of lame, but probably better than throwing an exception
+                ColumnSlice slice = new ColumnSlice(f.start(), f.finish());
+                ColumnSlice.serializer.serialize(slice, dos, version);
+            }
+            else
+            {
+                dos.writeInt(f.slices.length);
+                for (ColumnSlice slice : f.slices)
+                    ColumnSlice.serializer.serialize(slice, dos, version);
+            }
             dos.writeBoolean(f.reversed);
             dos.writeInt(f.count);
         }
 
         public SliceQueryFilter deserialize(DataInput dis, int version) throws IOException
         {
-            ByteBuffer start = ByteBufferUtil.readWithShortLength(dis);
-            ByteBuffer finish = ByteBufferUtil.readWithShortLength(dis);
+            ColumnSlice[] slices;
+            if (version < MessagingService.VERSION_12)
+            {
+                slices = new ColumnSlice[]{ ColumnSlice.serializer.deserialize(dis, version) };
+            }
+            else
+            {
+                slices = new ColumnSlice[dis.readInt()];
+                for (int i = 0; i < slices.length; i++)
+                    slices[i] = ColumnSlice.serializer.deserialize(dis, version);
+            }
             boolean reversed = dis.readBoolean();
             int count = dis.readInt();
-            return new SliceQueryFilter(start, finish, reversed, count);
+            return new SliceQueryFilter(slices, reversed, count);
         }
 
         public long serializedSize(SliceQueryFilter f, int version)
@@ -184,11 +220,16 @@ public class SliceQueryFilter implements IFilter
             TypeSizes sizes = TypeSizes.NATIVE;
 
             int size = 0;
-            int startSize = f.start.remaining();
-            int finishSize = f.finish.remaining();
-
-            size += sizes.sizeof((short) startSize) + startSize;
-            size += sizes.sizeof((short) finishSize) + finishSize;
+            if (version < MessagingService.VERSION_12)
+            {
+                size += ColumnSlice.serializer.serializedSize(new ColumnSlice(f.start(), f.finish()), version);
+            }
+            else
+            {
+                size += sizes.sizeof(f.slices.length);
+                for (ColumnSlice slice : f.slices)
+                    size += ColumnSlice.serializer.serializedSize(slice, version);
+            }
             size += sizes.sizeof(f.reversed);
             size += sizes.sizeof(f.count);
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index f748e82..29e076a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -150,7 +150,7 @@ public class IndexHelper
      *
      * @return int index
      */
-    public static int indexFor(ByteBuffer name, List<IndexInfo> indexList, AbstractType<?> comparator, boolean reversed)
+    public static int indexFor(ByteBuffer name, List<IndexInfo> indexList, AbstractType<?> comparator, boolean reversed, int lastIndex)
     {
         if (name.remaining() == 0 && reversed)
             return indexList.size() - 1;
@@ -168,8 +168,22 @@ public class IndexHelper
         i.e. 17 in this example, compared to the firstName part of the index slots.  bsearch will give us the
         first slot where firstName > start ([20..25] here), so we subtract an extra one to get the slot just before.
         */
-        int index = Collections.binarySearch(indexList, target, getComparator(comparator, reversed));
-        return index < 0 ? -index - (reversed ? 2 : 1) : index;
+        int startIdx = 0;
+        List<IndexInfo> toSearch = indexList;
+        if (lastIndex >= 0)
+        {
+            if (reversed)
+            {
+                startIdx = lastIndex;
+                toSearch = indexList.subList(lastIndex, indexList.size());
+            }
+            else
+            {
+                toSearch = indexList.subList(0, lastIndex + 1);
+            }
+        }
+        int index = Collections.binarySearch(toSearch, target, getComparator(comparator, reversed));
+        return startIdx + (index < 0 ? -index - (reversed ? 2 : 1) : index);
     }
 
     public static Comparator<IndexInfo> getComparator(final AbstractType<?> nameComparator, boolean reversed)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 027bc99..c606ef5 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -391,6 +391,9 @@ public class ByteBufferUtil
 
     public static ByteBuffer read(DataInput in, int length) throws IOException
     {
+        if (length == 0)
+            return EMPTY_BYTE_BUFFER;
+
         if (in instanceof FileDataInput)
             return ((FileDataInput) in).readBytes(length);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/data/serialization/1.2/db.RangeSliceCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/db.RangeSliceCommand.bin b/test/data/serialization/1.2/db.RangeSliceCommand.bin
index f98775a..259037a 100644
Binary files a/test/data/serialization/1.2/db.RangeSliceCommand.bin and b/test/data/serialization/1.2/db.RangeSliceCommand.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/data/serialization/1.2/db.Row.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/db.Row.bin b/test/data/serialization/1.2/db.Row.bin
index 3b080a9..99e9be4 100644
Binary files a/test/data/serialization/1.2/db.Row.bin and b/test/data/serialization/1.2/db.Row.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/data/serialization/1.2/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/db.RowMutation.bin b/test/data/serialization/1.2/db.RowMutation.bin
index 83b5328..2800321 100644
Binary files a/test/data/serialization/1.2/db.RowMutation.bin and b/test/data/serialization/1.2/db.RowMutation.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/data/serialization/1.2/db.SliceFromReadCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/db.SliceFromReadCommand.bin b/test/data/serialization/1.2/db.SliceFromReadCommand.bin
index 7160d24..b2741af 100644
Binary files a/test/data/serialization/1.2/db.SliceFromReadCommand.bin and b/test/data/serialization/1.2/db.SliceFromReadCommand.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/data/serialization/1.2/gms.EndpointState.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/gms.EndpointState.bin b/test/data/serialization/1.2/gms.EndpointState.bin
index 404e27a..d44c6ab 100644
Binary files a/test/data/serialization/1.2/gms.EndpointState.bin and b/test/data/serialization/1.2/gms.EndpointState.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/data/serialization/1.2/service.TreeResponse.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/service.TreeResponse.bin b/test/data/serialization/1.2/service.TreeResponse.bin
index 78e8121..b9de706 100644
Binary files a/test/data/serialization/1.2/service.TreeResponse.bin and b/test/data/serialization/1.2/service.TreeResponse.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/data/serialization/1.2/utils.BloomFilter.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/utils.BloomFilter.bin b/test/data/serialization/1.2/utils.BloomFilter.bin
index e1b8bb0..baaad3c 100644
Binary files a/test/data/serialization/1.2/utils.BloomFilter.bin and b/test/data/serialization/1.2/utils.BloomFilter.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d1171ddc/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index a4580d3..252d3dd 100644
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@ -32,6 +32,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Functions;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.utils.HeapAllocator;
 
@@ -158,10 +159,10 @@ public class ArrayBackedSortedColumnsTest
         //assertSame(new int[]{ 3, 5, 9 }, map.iterator(ByteBufferUtil.bytes(3)));
         //assertSame(new int[]{ 5, 9 }, map.iterator(ByteBufferUtil.bytes(4)));
 
-        assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(ByteBufferUtil.bytes(3)));
-        assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(ByteBufferUtil.bytes(4)));
+        assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(new ColumnSlice[]{ new ColumnSlice(ByteBufferUtil.bytes(3), ByteBufferUtil.EMPTY_BYTE_BUFFER) }));
+        assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(new ColumnSlice[]{ new ColumnSlice(ByteBufferUtil.bytes(4), ByteBufferUtil.EMPTY_BYTE_BUFFER) }));
 
-        assertSame(map.iterator(), map.iterator(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+        assertSame(map.iterator(), map.iterator(ColumnSlice.ALL_COLUMNS_ARRAY));
     }
 
     private <T> void assertSame(Collection<T> c1, Collection<T> c2)
@@ -181,8 +182,9 @@ public class ArrayBackedSortedColumnsTest
     {
         for (int name : names)
         {
-            assert iter.hasNext();
-            assert name == ByteBufferUtil.toInt(iter.next().name());
+            assert iter.hasNext() : "Expected " + name + " but no more result";
+            int value = ByteBufferUtil.toInt(iter.next().name());
+            assert name == value : "Expected " + name + " but got " + value;
         }
     }
 }