You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/01/02 17:08:49 UTC

[3/4] Merge get_indexed_slices with get_range_slices

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 15478eb..940e00a 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -171,9 +171,21 @@ public class QueryProcessor
         AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(startKey, finishKey);
         
         // XXX: Our use of Thrift structs internally makes me Sad. :(
-        SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata,variables);
+        SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata, variables);
         validateSlicePredicate(metadata, thriftSlicePredicate);
 
+        List<IndexExpression> expressions = new ArrayList<IndexExpression>();
+        for (Relation columnRelation : select.getColumnRelations())
+        {
+            // Left and right side of relational expression encoded according to comparator/validator.
+            ByteBuffer entity = columnRelation.getEntity().getByteBuffer(metadata.comparator, variables);
+            ByteBuffer value = columnRelation.getValue().getByteBuffer(select.getValueValidator(metadata.ksName, entity), variables);
+
+            expressions.add(new IndexExpression(entity,
+                                                IndexOperator.valueOf(columnRelation.operator().toString()),
+                                                value));
+        }
+
         int limit = select.isKeyRange() && select.getKeyStart() != null
                   ? select.getNumRecords() + 1
                   : select.getNumRecords();
@@ -185,6 +197,7 @@ public class QueryProcessor
                                                                     null,
                                                                     thriftSlicePredicate,
                                                                     bounds,
+                                                                    expressions,
                                                                     limit),
                                                                     select.getConsistencyLevel());
         }
@@ -218,51 +231,7 @@ public class QueryProcessor
 
         return rows.subList(0, select.getNumRecords() < rows.size() ? select.getNumRecords() : rows.size());
     }
-    
-    private static List<org.apache.cassandra.db.Row> getIndexedSlices(CFMetaData metadata, SelectStatement select, List<String> variables)
-    throws TimedOutException, UnavailableException, InvalidRequestException
-    {
-        // XXX: Our use of Thrift structs internally (still) makes me Sad. :~(
-        SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata, variables);
-        validateSlicePredicate(metadata, thriftSlicePredicate);
-        
-        List<IndexExpression> expressions = new ArrayList<IndexExpression>();
-        for (Relation columnRelation : select.getColumnRelations())
-        {
-            // Left and right side of relational expression encoded according to comparator/validator.
-            ByteBuffer entity = columnRelation.getEntity().getByteBuffer(metadata.comparator, variables);
-            ByteBuffer value = columnRelation.getValue().getByteBuffer(select.getValueValidator(metadata.ksName, entity), variables);
-            
-            expressions.add(new IndexExpression(entity,
-                                                IndexOperator.valueOf(columnRelation.operator().toString()),
-                                                value));
-        }
 
-        AbstractType<?> keyType = Schema.instance.getCFMetaData(metadata.ksName, select.getColumnFamily()).getKeyValidator();
-        ByteBuffer startKey = (!select.isKeyRange()) ? (new Term()).getByteBuffer() : select.getKeyStart().getByteBuffer(keyType, variables);
-        IndexClause thriftIndexClause = new IndexClause(expressions, startKey, select.getNumRecords());
-        
-        List<org.apache.cassandra.db.Row> rows;
-        try
-        {
-            rows = StorageProxy.scan(metadata.ksName,
-                                     select.getColumnFamily(),
-                                     thriftIndexClause,
-                                     thriftSlicePredicate,
-                                     select.getConsistencyLevel());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (TimeoutException e)
-        {
-            throw new TimedOutException();
-        }
-        
-        return rows;
-    }
-    
     private static void batchUpdate(ClientState clientState, List<UpdateStatement> updateStatements, ConsistencyLevel consistency, List<String> variables )
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
@@ -544,16 +513,7 @@ public class QueryProcessor
                 }
                 else
                 {
-                    // Range query
-                    if ((select.getKeyFinish() != null) || (select.getColumnRelations().size() == 0))
-                    {
-                        rows = multiRangeSlice(metadata, select, variables);
-                    }
-                    // Index scan
-                    else
-                    {
-                        rows = getIndexedSlices(metadata, select, variables);
-                    }
+                    rows = multiRangeSlice(metadata, select, variables);
                 }
 
                 // count resultset is a single column named "count"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 907ccb4..706f43b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,10 +30,10 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import javax.management.*;
 
+import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.service.CacheService;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +48,8 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -59,7 +61,7 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.IntervalTree.Interval;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -1265,87 +1267,139 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return columns;
     }
 
+    public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row> {}
+
     /**
-      * Fetch a range of rows and columns from memtables/sstables.
+      * Iterate over a range of rows and columns from memtables/sstables.
       *
       * @param superColumn optional SuperColumn to slice subcolumns of; null to slice top-level columns
       * @param range Either a Bounds, which includes start key, or a Range, which does not.
-      * @param maxResults Maximum rows to return
       * @param columnFilter description of the columns we're interested in for each row
-      * @return true if we found all keys we were looking for, otherwise false
      */
-    public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter)
+    public AbstractScanIterator getSequentialIterator(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, IFilter columnFilter)
     {
         assert range instanceof Bounds
                || !((Range)range).isWrapAround() || range.right.isMinimum()
                : range;
 
-        RowPosition startWith = range.left;
-        RowPosition stopAt = range.right;
+        final RowPosition startWith = range.left;
+        final RowPosition stopAt = range.right;
 
         QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
-        int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
 
         List<Row> rows;
-        ViewFragment view = markReferenced(startWith, stopAt);
+        final ViewFragment view = markReferenced(startWith, stopAt);
         try
         {
-            CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
-            rows = new ArrayList<Row>();
+            final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
+            final int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
 
-            try
+            return new AbstractScanIterator()
             {
-                // pull rows out of the iterator
                 boolean first = true;
-                while (iterator.hasNext())
+
+                protected Row computeNext()
                 {
+                    // pull a row out of the iterator
+                    if (!iterator.hasNext())
+                        return endOfData();
+
                     Row current = iterator.next();
                     DecoratedKey key = current.key;
 
                     if (!stopAt.isMinimum() && stopAt.compareTo(key) < 0)
-                        return rows;
+                        return endOfData();
 
                     // skip first one
                     if (range instanceof Bounds || !first || !key.equals(startWith))
                     {
-                        // TODO this is necessary because when we collate supercolumns together, we don't check
-                        // their subcolumns for relevance, so we need to do a second prune post facto here.
-                        rows.add(current.cf != null && current.cf.isSuper()
-                                 ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore))
-                                 : current);
                         if (logger.isDebugEnabled())
                             logger.debug("scanned " + key);
+                        // TODO this is necessary because when we collate supercolumns together, we don't check
+                        // their subcolumns for relevance, so we need to do a second prune post facto here.
+                        return current.cf != null && current.cf.isSuper()
+                             ? new Row(current.key, removeDeleted(current.cf, gcBefore))
+                             : current;
                     }
                     first = false;
 
-                    if (rows.size() >= maxResults)
-                        return rows;
+                    return computeNext();
                 }
-            }
-            finally
-            {
-                try
-                {
-                    iterator.close();
-                }
-                catch (IOException e)
+
+                public void close() throws IOException
                 {
-                    throw new IOError(e);
+                    SSTableReader.releaseReferences(view.sstables);
+                    try
+                    {
+                        iterator.close();
+                    }
+                    catch (IOException e)
+                    {
+                        throw new IOError(e);
+                    }
                 }
-            }
+            };
         }
-        finally
+        catch (RuntimeException e)
         {
-            // separate finally block to release references in case getIterator() throws
+            // In case getIterator() throws, otherwise the iteror close method releases the references.
             SSTableReader.releaseReferences(view.sstables);
+            throw e;
         }
-
-        return rows;
     }
 
-    public List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter)
-    {
-        return indexManager.search(clause, range, dataFilter);
+    public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter, List<IndexExpression> rowFilter)
+    {
+        return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults));
+    }
+
+    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
+    {
+        return indexManager.search(clause, range, maxResults, dataFilter);
+    }
+
+    public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
+    {
+         List<Row> rows = new ArrayList<Row>();
+         try
+         {
+             while (rowIterator.hasNext() && rows.size() < filter.maxResults)
+             {
+                 // get the raw columns requested, and additional columns for the expressions if necessary
+                 Row rawRow = rowIterator.next();
+                 ColumnFamily data = rawRow.cf;
+
+                 // roughtly
+                 IFilter extraFilter = filter.getExtraFilter(data);
+                 if (extraFilter != null)
+                 {
+                     QueryPath path = new QueryPath(columnFamily);
+                     ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, path, extraFilter));
+                     if (cf != null)
+                         data.addAll(cf, HeapAllocator.instance);
+                 }
+
+                 if (!filter.isSatisfiedBy(data))
+                     continue;
+
+                 logger.debug("{} satisfies all filter expressions", data);
+                 // cut the resultset back to what was requested, if necessary
+                 data = filter.prune(data);
+                 rows.add(new Row(rawRow.key, data));
+             }
+             return rows;
+         }
+         finally
+         {
+             try
+             {
+                 rowIterator.close();
+             }
+             catch (IOException e)
+             {
+                 throw new IOError(e);
+             }
+         }
     }
 
     public AbstractType getComparator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 5df816d..272d415 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -348,7 +348,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         IPartitioner p = StorageService.getPartitioner();
         Range range = new Range(p.getMinimumToken(), p.getMinimumToken(), p);
         IFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
-        List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter);
+        List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter, null);
         for (Row row : rows)
         {
             Token<?> token = StorageService.getPartitioner().getTokenFactory().fromByteArray(row.key.key);
@@ -434,7 +434,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         List<Row> rows;
         try
         {
-            rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE);
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index cc200ae..4ab821e 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -39,6 +39,8 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
@@ -47,15 +49,17 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
-import org.apache.cassandra.thrift.TBinaryProtocol;
 
 public class RangeSliceCommand implements MessageProducer, IReadCommand
 {
@@ -67,16 +71,22 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand
     public final ByteBuffer super_column;
 
     public final SlicePredicate predicate;
+    public final List<IndexExpression> row_filter;
 
     public final AbstractBounds<RowPosition> range;
     public final int max_keys;
 
-    public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds<RowPosition> range, int max_keys)
+    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int max_keys)
     {
-        this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, max_keys);
+        this(keyspace, column_family, super_column, predicate, range, null, max_keys);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int max_keys)
+    public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int max_keys)
+    {
+        this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, row_filter, max_keys);
+    }
+
+    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int max_keys)
     {
         this.keyspace = keyspace;
         this.column_family = column_family;
@@ -84,6 +94,7 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand
         this.predicate = predicate;
         this.range = range;
         this.max_keys = max_keys;
+        this.row_filter = row_filter;
     }
 
     public Message getMessage(Integer version) throws IOException
@@ -105,6 +116,7 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand
                ", predicate=" + predicate +
                ", range=" + range +
                ", max_keys=" + max_keys +
+               ", row_filter =" + row_filter +
                '}';
     }
 
@@ -134,6 +146,20 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
 
         TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
         FBUtilities.serialize(ser, sliceCommand.predicate, dos);
+
+        if (version >= MessagingService.VERSION_11)
+        {
+            if (sliceCommand.row_filter == null)
+            {
+                dos.writeInt(0);
+            }
+            else
+            {
+                dos.writeInt(sliceCommand.row_filter.size());
+                for (IndexExpression expr : sliceCommand.row_filter)
+                    FBUtilities.serialize(ser, expr, dos);
+            }
+        }
         AbstractBounds.serializer().serialize(sliceCommand.range, dos, version);
         dos.writeInt(sliceCommand.max_keys);
     }
@@ -141,24 +167,37 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
     public RangeSliceCommand deserialize(DataInput dis, int version) throws IOException
     {
         String keyspace = dis.readUTF();
-        String column_family = dis.readUTF();
+        String columnFamily = dis.readUTF();
 
         int scLength = dis.readInt();
-        ByteBuffer super_column = null;
+        ByteBuffer superColumn = null;
         if (scLength > 0)
         {
             byte[] buf = new byte[scLength];
             dis.readFully(buf);
-            super_column = ByteBuffer.wrap(buf);
+            superColumn = ByteBuffer.wrap(buf);
         }
 
         TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
         SlicePredicate pred = new SlicePredicate();
         FBUtilities.deserialize(dser, pred, dis);
+
+        List<IndexExpression> rowFilter = null;
+        if (version >= MessagingService.VERSION_11)
+        {
+            int filterCount = dis.readInt();
+            rowFilter = new ArrayList<IndexExpression>(filterCount);
+            for (int i = 0; i < filterCount; i++)
+            {
+                IndexExpression expr = new IndexExpression();
+                FBUtilities.deserialize(dser, expr, dis);
+                rowFilter.add(expr);
+            }
+        }
         AbstractBounds<RowPosition> range = AbstractBounds.serializer().deserialize(dis, version).toRowBounds();
 
-        int max_keys = dis.readInt();
-        return new RangeSliceCommand(keyspace, column_family, super_column, pred, range, max_keys);
+        int maxKeys = dis.readInt();
+        return new RangeSliceCommand(keyspace, columnFamily, superColumn, pred, range, rowFilter, maxKeys);
     }
 
     public long serializedSize(RangeSliceCommand rangeSliceCommand, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
new file mode 100644
index 0000000..bf56e99
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -0,0 +1,250 @@
+package org.apache.cassandra.db.filter;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Extends a column filter (IFilter) to include a number of IndexExpression.
+ */
+public abstract class ExtendedFilter
+{
+    private static Logger logger = LoggerFactory.getLogger(ExtendedFilter.class);
+
+    public final ColumnFamilyStore cfs;
+    public final int maxResults;
+    protected final IFilter originalFilter;
+
+    public static ExtendedFilter create(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults)
+    {
+        if (clause == null || clause.isEmpty())
+            return new EmptyClauseFilter(cfs, filter, maxResults);
+        else
+            return new FilterWithClauses(cfs, filter, clause, maxResults);
+    }
+
+    protected ExtendedFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults)
+    {
+        assert cfs != null;
+        assert filter != null;
+        this.cfs = cfs;
+        this.originalFilter = filter;
+        this.maxResults = maxResults;
+    }
+
+    /** The initial filter we'll do our first slice with (either the original or a superset of it) */
+    public abstract IFilter initialFilter();
+
+    public abstract List<IndexExpression> getClause();
+
+    /**
+     * Returns a filter to query the columns from the clause that the initial slice filter may not have caught.
+     * @param data the data retrieve by the initial filter
+     * @return a filter or null if there can't be any columns we missed with our initial filter (typically if it was a names query, or a slice of the entire row)
+     */
+    public abstract IFilter getExtraFilter(ColumnFamily data);
+
+    /**
+     * @return data pruned down to the columns originally asked for
+     */
+    public abstract ColumnFamily prune(ColumnFamily data);
+
+    /**
+     * @return true if the provided data satisfies all the expressions from
+     * the clause of this filter.
+     */
+    public abstract boolean isSatisfiedBy(ColumnFamily data);
+
+    public static boolean satisfies(int comparison, IndexOperator op)
+    {
+        switch (op)
+        {
+            case EQ:
+                return comparison == 0;
+            case GTE:
+                return comparison >= 0;
+            case GT:
+                return comparison > 0;
+            case LTE:
+                return comparison <= 0;
+            case LT:
+                return comparison < 0;
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private static class FilterWithClauses extends ExtendedFilter
+    {
+        protected final List<IndexExpression> clause;
+        protected final IFilter initialFilter;
+
+        public FilterWithClauses(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults)
+        {
+            super(cfs, filter, maxResults);
+            assert clause != null;
+            this.clause = clause;
+            this.initialFilter = computeInitialFilter();
+        }
+
+        /** Sets up the initial filter. */
+        private IFilter computeInitialFilter()
+        {
+            if (originalFilter instanceof SliceQueryFilter)
+            {
+                // if we have a high chance of getting all the columns in a single index slice (and it's not too costly), do that.
+                // otherwise, the extraFilter (lazily created) will fetch by name the columns referenced by the additional expressions.
+                if (cfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
+                {
+                    logger.debug("Expanding slice filter to entire row to cover additional expressions");
+                    return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                ((SliceQueryFilter) originalFilter).reversed,
+                                                Integer.MAX_VALUE);
+                }
+            }
+            else
+            {
+                logger.debug("adding columns to original Filter to cover additional expressions");
+                assert originalFilter instanceof NamesQueryFilter;
+                SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+                for (IndexExpression expr : clause)
+                {
+                    columns.add(expr.column_name);
+                }
+                if (columns.size() > 0)
+                {
+                    columns.addAll(((NamesQueryFilter) originalFilter).columns);
+                    return new NamesQueryFilter(columns);
+                }
+            }
+            return originalFilter;
+        }
+
+        public IFilter initialFilter()
+        {
+            return initialFilter;
+        }
+
+        public List<IndexExpression> getClause()
+        {
+            return clause;
+        }
+
+        /*
+         * We may need an extra query only if the original was a slice query (and thus may have miss the expression for the clause).
+         * Even then, there is no point in doing an extra query if the original filter grabbed the whole row.
+         * Lastly, we only need the extra query if we haven't yet got all the expressions from the clause.
+         */
+        private boolean needsExtraQuery(ColumnFamily data)
+        {
+            if (!(originalFilter instanceof SliceQueryFilter))
+                return false;
+
+            SliceQueryFilter filter = (SliceQueryFilter)originalFilter;
+            // Check if we've fetch the whole row
+            if (filter.start.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+             && filter.finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+             && filter.count == Integer.MAX_VALUE)
+                return false;
+
+            for (IndexExpression expr : clause)
+            {
+                if (data.getColumn(expr.column_name) == null)
+                {
+                    logger.debug("adding extraFilter to cover additional expressions");
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public IFilter getExtraFilter(ColumnFamily data)
+        {
+            if (!needsExtraQuery(data))
+                return null;
+
+            // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
+            // why we do the dance of avoiding to query any column we already have (it's also more efficient anyway)
+            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+            for (IndexExpression expr : clause)
+            {
+                if (data.getColumn(expr.column_name) == null)
+                    columns.add(expr.column_name);
+            }
+            assert !columns.isEmpty();
+            return new NamesQueryFilter(columns);
+        }
+
+        public ColumnFamily prune(ColumnFamily data)
+        {
+            if (initialFilter == originalFilter)
+                return data;
+            ColumnFamily pruned = data.cloneMeShallow();
+            IColumnIterator iter = originalFilter.getMemtableColumnIterator(data, null);
+            originalFilter.collectReducedColumns(pruned, iter, cfs.gcBefore());
+            return pruned;
+        }
+
+        public boolean isSatisfiedBy(ColumnFamily data)
+        {
+            // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
+            // where the index returned a row which doesn't have the primary column when we actually read it
+            for (IndexExpression expression : clause)
+            {
+                // check column data vs expression
+                IColumn column = data.getColumn(expression.column_name);
+                if (column == null)
+                    return false;
+                int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
+                if (!satisfies(v, expression.op))
+                    return false;
+            }
+            return true;
+        }
+    }
+
+    private static class EmptyClauseFilter extends ExtendedFilter
+    {
+        public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults)
+        {
+            super(cfs, filter, maxResults);
+        }
+
+        public IFilter initialFilter()
+        {
+            return originalFilter;
+        }
+
+        public List<IndexExpression> getClause()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public IFilter getExtraFilter(ColumnFamily data)
+        {
+            return null;
+        }
+
+        public ColumnFamily prune(ColumnFamily data)
+        {
+            return data;
+        }
+
+        public boolean isSatisfiedBy(ColumnFamily data)
+        {
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index c2d2318..79feb96 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -70,7 +70,6 @@ public abstract class SecondaryIndex
      */
     public abstract void validateOptions() throws ConfigurationException;
 
-    
     /**
      * @return The name of the index
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 891313c..01b795f 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.lang.StringUtils;
@@ -143,6 +142,26 @@ public class SecondaryIndexManager
     }
 
     /**
+     * @return true if the indexes can handle the clause.
+     */
+    public boolean hasIndexFor(List<IndexExpression> clause)
+    {
+        if (clause == null || clause.isEmpty())
+            return false;
+
+        // It doesn't seem a clause can have multiple searchers, but since
+        // getIndexSearchersForQuery returns a list ...
+        List<SecondaryIndexSearcher> searchers = getIndexSearchersForQuery(clause);
+        if (searchers.isEmpty())
+            return false;
+
+        for (SecondaryIndexSearcher searcher : searchers)
+            if (!searcher.isIndexing(clause))
+                return false;
+        return true;
+    }
+
+    /**
      * Removes a existing index
      * @param column the indexed column to remove
      * @throws IOException 
@@ -485,9 +504,9 @@ public class SecondaryIndexManager
     /**
      * Get a list of IndexSearchers from the union of expression index types
      * @param clause the query clause
-     * @return the searchers to needed to query the index
+     * @return the searchers needed to query the index
      */
-    private List<SecondaryIndexSearcher> getIndexSearchersForQuery(IndexClause clause)
+    private List<SecondaryIndexSearcher> getIndexSearchersForQuery(List<IndexExpression> clause)
     {
         List<SecondaryIndexSearcher> indexSearchers = new ArrayList<SecondaryIndexSearcher>();
         
@@ -495,7 +514,7 @@ public class SecondaryIndexManager
  
         
         //Group columns by type
-        for (IndexExpression ix : clause.expressions)
+        for (IndexExpression ix : clause)
         {
             SecondaryIndex index = getIndexForColumn(ix.column_name);
             
@@ -531,7 +550,7 @@ public class SecondaryIndexManager
      * @param dataFilter the column range to restrict to
      * @return found indexed rows
      */
-    public List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter)
+    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
     {
         List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
                
@@ -543,6 +562,6 @@ public class SecondaryIndexManager
             throw new RuntimeException("Unable to search across multiple secondary index types");
         
         
-        return indexSearchers.get(0).search(clause, range, dataFilter);
+        return indexSearchers.get(0).search(clause, range, maxResults, dataFilter);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 4fe9f2a..6365c81 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 
@@ -40,52 +39,11 @@ public abstract class SecondaryIndexSearcher
         this.columns = columns;
         this.baseCfs = indexManager.baseCfs;
     }
-    
-    public static boolean satisfies(ColumnFamily data, IndexClause clause, IndexExpression first)
-    {
-        // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
-        // where the index returned a row which doesn't have the primarycolumn when we actually read it
-        for (IndexExpression expression : clause.expressions)
-        {
-            // check column data vs expression
-            IColumn column = data.getColumn(expression.column_name);
-            if (column == null)
-                return false;
-            int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
-            if (!satisfies(v, expression.op))
-                return false;
-        }
-        return true;
-    }
 
-    public static boolean satisfies(int comparison, IndexOperator op)
-    {
-        switch (op)
-        {
-            case EQ:
-                return comparison == 0;
-            case GTE:
-                return comparison >= 0;
-            case GT:
-                return comparison > 0;
-            case LTE:
-                return comparison <= 0;
-            case LT:
-                return comparison < 0;
-            default:
-                throw new IllegalStateException();
-        }
-    }
-    
-    public NamesQueryFilter getExtraFilter(IndexClause clause)
-    {
-        SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(baseCfs.getComparator());
-        for (IndexExpression expr : clause.expressions)
-        {
-            columns.add(expr.column_name);
-        }
-        return new NamesQueryFilter(columns);
-    }
-    
-    public abstract List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter);
+    public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter);
+
+    /**
+     * @return true this index is able to handle given clauses.
+     */
+    public abstract boolean isIndexing(List<IndexExpression> clause);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index c322a9a..4f975eb 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.index.keys;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
@@ -28,7 +29,7 @@ import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,11 +46,11 @@ public class KeysSearcher extends SecondaryIndexSearcher
         super(indexManager, columns);
     }
     
-    private IndexExpression highestSelectivityPredicate(IndexClause clause)
+    private IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
     {
         IndexExpression best = null;
         int bestMeanCount = Integer.MAX_VALUE;
-        for (IndexExpression expression : clause.expressions)
+        for (IndexExpression expression : clause)
         {
             //skip columns belonging to a different index type
             if(!columns.contains(expression.column_name))
@@ -77,181 +78,134 @@ public class KeysSearcher extends SecondaryIndexSearcher
                              baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
     }
 
-    private static boolean isIdentityFilter(SliceQueryFilter filter)
+    public boolean isIndexing(List<IndexExpression> clause)
     {
-        return filter.start.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-            && filter.finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-            && filter.count == Integer.MAX_VALUE;
+        return highestSelectivityPredicate(clause) != null;
     }
-    
+
     @Override
-    public List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter)
+    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
+    {
+        assert clause != null && !clause.isEmpty();
+        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults);
+        return baseCfs.filter(getIndexedIterator(range, filter), filter);
+    }
+
+    public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
     {
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
         // TODO: allow merge join instead of just one index + loop
-        IndexExpression primary = highestSelectivityPredicate(clause);
-        SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
+        final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
+        final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
         if (logger.isDebugEnabled())
             logger.debug("Primary scan clause is " + baseCfs.getComparator().getString(primary.column_name));
         assert index != null;
-        DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value);
-
-        // if the slicepredicate doesn't contain all the columns for which we have expressions to evaluate,
-        // it needs to be expanded to include those too
-        IFilter firstFilter = dataFilter;
-        if (dataFilter instanceof SliceQueryFilter)
-        {
-            // if we have a high chance of getting all the columns in a single index slice, do that.
-            // otherwise, we'll create an extraFilter (lazily) to fetch by name the columns referenced by the additional expressions.
-            if (baseCfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
-            {
-                logger.debug("Expanding slice filter to entire row to cover additional expressions");
-                firstFilter = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                        ((SliceQueryFilter) dataFilter).reversed,
-                        Integer.MAX_VALUE);
-            }
-        }
-        else
-        {
-            logger.debug("adding columns to firstFilter to cover additional expressions");
-            // just add in columns that are not part of the resultset
-            assert dataFilter instanceof NamesQueryFilter;
-            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(baseCfs.getComparator());
-            for (IndexExpression expr : clause.expressions)
-            {
-                columns.add(expr.column_name);
-            }
-            if (columns.size() > 0)
-            {
-                columns.addAll(((NamesQueryFilter) dataFilter).columns);
-                firstFilter = new NamesQueryFilter(columns);
-            }
-        }
-
-        List<Row> rows = new ArrayList<Row>();
-        ByteBuffer startKey = clause.start_key;
-        QueryPath path = new QueryPath(baseCfs.columnFamily);
-
-        // we need to store last data key accessed to avoid duplicate results
-        // because in the while loop new iteration we can access the same column if start_key was not set
-        ByteBuffer lastDataKey = null;
-
-        // fetch row keys matching the primary expression, fetch the slice predicate for each
-        // and filter by remaining expressions.  repeat until finished w/ assigned range or index row is exhausted.
-        outer:
-        while (true)
+        final DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value);
+
+        /*
+         * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+         * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
+         * possible key having a given token. A fix would be to actually store the token along the key in the
+         * indexed row.
+         */
+        final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        return new ColumnFamilyStore.AbstractScanIterator()
         {
-            /* we don't have a way to get the key back from the DK -- we just have a token --
-             * so, we need to loop after starting with start_key, until we get to keys in the given `range`.
-             * But, if the calling StorageProxy is doing a good job estimating data from each range, the range
-             * should be pretty close to `start_key`. */
-            if (logger.isDebugEnabled())
-                logger.debug(String.format("Scanning index %s starting with %s",
-                                           expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
-
-            // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
-            int count = Math.max(clause.count, 2);
-            QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
-                                                                 new QueryPath(index.getIndexCfs().getColumnFamilyName()),
-                                                                 startKey,
-                                                                 ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                                 false,
-                                                                 count);
-            ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
-            logger.debug("fetched {}", indexRow);
-            if (indexRow == null)
-                break;
+            private ByteBuffer lastSeenKey = startKey;
+            private Iterator<IColumn> indexColumns;
+            private final QueryPath path = new QueryPath(baseCfs.columnFamily);
+            private int columnsRead = Integer.MAX_VALUE;
 
-            ByteBuffer dataKey = null;
-            int n = 0;
-            for (IColumn column : indexRow.getSortedColumns())
+            protected Row computeNext()
             {
-                if (column.isMarkedForDelete())
-                {
-                    logger.debug("skipping {}",column.name());
-                    continue;
-                }
-                
-                dataKey = column.name();
-                n++;
-                
-                if(logger.isDebugEnabled())
-                    logger.debug("fetching {}",column.name());
-
-                DecoratedKey dk = baseCfs.partitioner.decorateKey(dataKey);
-                if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
-                    break outer;
-                if (!range.contains(dk) || dataKey.equals(lastDataKey))
-                    continue;
-
-                // get the row columns requested, and additional columns for the expressions if necessary
-                ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, firstFilter));
-                // While we the column family we'll get in the end should contains the primary clause column, the firstFilter may not have found it.
-                if (data == null)
-                    data = ColumnFamily.create(baseCfs.metadata);
-                logger.debug("fetched data row {}", data);
-                NamesQueryFilter extraFilter = null;
-                if (dataFilter instanceof SliceQueryFilter && !isIdentityFilter((SliceQueryFilter)dataFilter))
+                while (true)
                 {
-                    // we might have gotten the expression columns in with the main data slice, but
-                    // we can't know for sure until that slice is done.  So, we'll do the extra query
-                    // if we go through and any expression columns are not present.
-                    boolean needExtraFilter = false;
-                    for (IndexExpression expr : clause.expressions)
+                    if (indexColumns == null || !indexColumns.hasNext())
                     {
-                        if (data.getColumn(expr.column_name) == null)
+                        if (columnsRead < filter.maxResults)
                         {
-                            logger.debug("adding extraFilter to cover additional expressions");
-                            // Lazily creating extra filter
-                            needExtraFilter = true;
-                            break;
+                            logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, filter.maxResults);
+                            return endOfData();
                         }
-                    }
-                    if (needExtraFilter)
-                    {
-                        // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
-                        // why we do the dance of avoiding to query any column we already have (it's also more efficient anyway)
-                        extraFilter = getExtraFilter(clause);
-                        for (IndexExpression expr : clause.expressions)
+
+                        if (logger.isDebugEnabled())
+                            logger.debug(String.format("Scanning index %s starting with %s",
+                                                       expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
+
+                        // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
+                        int count = Math.max(filter.maxResults, 2);
+                        QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+                                                                             new QueryPath(index.getIndexCfs().getColumnFamilyName()),
+                                                                             lastSeenKey,
+                                                                             endKey,
+                                                                             false,
+                                                                             count);
+                        ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
+                        logger.debug("fetched {}", indexRow);
+                        if (indexRow == null)
                         {
-                            if (data.getColumn(expr.column_name) != null)
-                                extraFilter.columns.remove(expr.column_name);
+                            logger.debug("no data, all done");
+                            return endOfData();
                         }
-                        assert !extraFilter.columns.isEmpty();
-                        ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, path, extraFilter));
-                        if (cf != null)
-                            data.addAll(cf, HeapAllocator.instance);
-                    }
 
-                }
+                        Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+                        columnsRead = sortedColumns.size();
+                        indexColumns = sortedColumns.iterator();
+                        IColumn firstColumn = sortedColumns.iterator().next();
 
-                if (SecondaryIndexSearcher.satisfies(data, clause, primary))
-                {
-                    logger.debug("row {} satisfies all clauses", data);
-                    // cut the resultset back to what was requested, if necessary
-                    if (firstFilter != dataFilter || extraFilter != null)
-                    {
-                        ColumnFamily expandedData = data;
-                        data = expandedData.cloneMeShallow();
-                        IColumnIterator iter = dataFilter.getMemtableColumnIterator(expandedData, dk);
-                        new QueryFilter(dk, path, dataFilter).collateColumns(data, Collections.singletonList(iter), baseCfs.gcBefore());
+                        // Paging is racy, so it is possible the first column of a page is not the last seen one.
+                        if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
+                        {
+                            // skip the row we already saw w/ the last page of results
+                            indexColumns.next();
+                            columnsRead--;
+                            logger.debug("Skipping {}", baseCfs.getComparator().getString(firstColumn.name()));
+                        }
+                        else if (range instanceof Range && indexColumns.hasNext() && firstColumn.equals(startKey))
+                        {
+                            // skip key excluded by range
+                            indexColumns.next();
+                            columnsRead--;
+                            logger.debug("Skipping first key as range excludes it");
+                        }
                     }
 
-                    rows.add(new Row(dk, data));
-                }
+                    while (indexColumns.hasNext())
+                    {
+                        IColumn column = indexColumns.next();
+                        lastSeenKey = column.name();
+                        if (column.isMarkedForDelete())
+                        {
+                            logger.debug("skipping {}", column.name());
+                            continue;
+                        }
 
-                if (rows.size() == clause.count)
-                    break outer;
-            }
-            if (n < clause.count || startKey.equals(dataKey))
-                break;
+                        DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
+                        if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
+                        {
+                            logger.debug("Reached end of assigned scan range");
+                            return endOfData();
+                        }
+                        if (!range.contains(dk))
+                        {
+                            logger.debug("Skipping entry {} outside of assigned scan range", dk.token);
+                            continue;
+                        }
 
-            lastDataKey = startKey = dataKey;
-        }
+                        logger.debug("Returning index hit for {}", dk);
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
+                        // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
+                        if (data == null)
+                            data = ColumnFamily.create(baseCfs.metadata);
+                        return new Row(dk, data);
+                    }
+                 }
+             }
 
-        return rows;
+            public void close() throws IOException {}
+        };
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 0ece843..066b2e4 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -39,7 +39,10 @@ public class IndexScanVerbHandler implements IVerbHandler
         {
             IndexScanCommand command = IndexScanCommand.read(message);
             ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-            List<Row> rows = cfs.indexManager.search(command.index_clause, command.range, QueryFilter.getFilter(command.predicate, cfs.getComparator()));
+            List<Row> rows = cfs.indexManager.search(command.index_clause.expressions,
+                                                     command.range,
+                                                     command.index_clause.count,
+                                                     QueryFilter.getFilter(command.predicate, cfs.getComparator()));
             RangeSliceReply reply = new RangeSliceReply(rows);
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index 971ccb3..2353b71 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -18,23 +18,38 @@
 
 package org.apache.cassandra.service;
 
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RangeSliceCommand;
 import org.apache.cassandra.db.RangeSliceReply;
+import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 
 public class RangeSliceVerbHandler implements IVerbHandler
 {
-
     private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
 
+    static List<Row> executeLocally(RangeSliceCommand command) throws ExecutionException, InterruptedException
+    {
+        ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
+        IFilter columnFilter = QueryFilter.getFilter(command.predicate, cfs.getComparator());
+
+        if (cfs.indexManager.hasIndexFor(command.row_filter))
+            return cfs.search(command.row_filter, command.range, command.max_keys, columnFilter);
+        else
+            return cfs.getRangeSlice(command.super_column, command.range, command.max_keys, columnFilter, command.row_filter);
+    }
+
     public void doVerb(Message message, String id)
     {
         try
@@ -46,10 +61,7 @@ public class RangeSliceVerbHandler implements IVerbHandler
             }
             RangeSliceCommand command = RangeSliceCommand.read(message);
             ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-            RangeSliceReply reply = new RangeSliceReply(cfs.getRangeSlice(command.super_column,
-                                                                          command.range,
-                                                                          command.max_keys,
-                                                                          QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+            RangeSliceReply reply = new RangeSliceReply(executeLocally(command));
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 458b8ec..c4c69fd 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -830,32 +830,46 @@ public class StorageProxy implements StorageProxyMBean
             List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
             for (AbstractBounds<RowPosition> range : ranges)
             {
-                List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
+                RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
+                                                                  command.column_family,
+                                                                  command.super_column,
+                                                                  command.predicate,
+                                                                  range,
+                                                                  command.row_filter,
+                                                                  command.max_keys);
+
+                List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
                 DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
 
                 if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("local range slice");
-                    ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-                    rows.addAll(cfs.getRangeSlice(command.super_column,
-                                                range,
-                                                command.max_keys,
-                                                QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+
+                    try
+                    {
+                        rows.addAll(RangeSliceVerbHandler.executeLocally(nodeCmd));
+                    }
+                    catch (ExecutionException e)
+                    {
+                        throw new RuntimeException(e.getCause());
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new AssertionError(e);
+                    }
                 }
                 else
                 {
-                    RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
-
                     // collect replies and resolve according to consistency level
-                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-                    ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, liveEndpoints);
+                    ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
                     handler.assureSufficientLiveNodes();
                     for (InetAddress endpoint : handler.endpoints)
                     {
-                        MessagingService.instance().sendRR(c2, endpoint, handler);
+                        MessagingService.instance().sendRR(nodeCmd, endpoint, handler);
                         if (logger.isDebugEnabled())
-                            logger.debug("reading " + c2 + " from " + endpoint);
+                            logger.debug("reading " + nodeCmd + " from " + endpoint);
                     }
 
                     try
@@ -880,7 +894,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 // if we're done, great, otherwise, move to the next range
-                if (rows.size() >= command.max_keys)
+                if (rows.size() >= nodeCmd.max_keys)
                     break;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 1a71f04..4987617 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -668,7 +668,7 @@ public class CassandraServer implements Cassandra.Iface
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
         ThriftValidation.validateColumnParent(metadata, column_parent);
         ThriftValidation.validatePredicate(metadata, column_parent, predicate);
-        ThriftValidation.validateKeyRange(range);
+        ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
         ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
 
         List<Row> rows;
@@ -690,7 +690,7 @@ public class CassandraServer implements Cassandra.Iface
             schedule(DatabaseDescriptor.getRpcTimeout());
             try
             {
-                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
+                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.row_filter, range.count), consistency_level);
             }
             finally
             {
@@ -739,7 +739,11 @@ public class CassandraServer implements Cassandra.Iface
         List<Row> rows;
         try
         {
-            rows = StorageProxy.scan(keyspace, column_parent.column_family, index_clause, column_predicate, consistency_level);
+            rows = StorageProxy.scan(keyspace,
+                                     column_parent.column_family,
+                                     index_clause,
+                                     column_predicate,
+                                     consistency_level);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 76635d9..fd73c53 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -479,7 +479,7 @@ public class ThriftValidation
             validateColumnNames(metadata, column_parent, predicate.column_names);
     }
 
-    public static void validateKeyRange(KeyRange range) throws InvalidRequestException
+    public static void validateKeyRange(CFMetaData metadata, ByteBuffer superColumn, KeyRange range) throws InvalidRequestException
     {
         if ((range.start_key == null) != (range.end_key == null))
         {
@@ -508,22 +508,54 @@ public class ThriftValidation
             }
         }
 
+        validateFilterClauses(metadata, range.row_filter);
+
+        if (!isEmpty(range.row_filter) && superColumn != null)
+        {
+            throw new InvalidRequestException("super columns are not yet supported for indexing");
+        }
+        if (!isEmpty(range.row_filter) && range.start_key == null)
+        {
+            // TODO: our current KEYS indexes can't do that efficiently
+            // (without scanning *all* the keys in the range and simply applying the filter to discard them when they don't match)
+            // See KeySearcher.search()
+            throw new InvalidRequestException("filtered queries must use concrete keys rather than tokens");
+        }
+
         if (range.count <= 0)
         {
             throw new InvalidRequestException("maxRows must be positive");
         }
     }
 
+    private static boolean isEmpty(List<IndexExpression> clause)
+    {
+        return clause == null || clause.isEmpty();
+    }
+
     public static void validateIndexClauses(CFMetaData metadata, IndexClause index_clause)
     throws InvalidRequestException
     {
         if (index_clause.expressions.isEmpty())
             throw new InvalidRequestException("index clause list may not be empty");
+
+        if (!validateFilterClauses(metadata, index_clause.expressions))
+            throw new InvalidRequestException("No indexed columns present in index clause with operator EQ");
+    }
+
+    // return true if index_clause contains an indexed columns with operator EQ
+    public static boolean validateFilterClauses(CFMetaData metadata, List<IndexExpression> index_clause)
+    throws InvalidRequestException
+    {
+        if (isEmpty(index_clause))
+            // no filter to apply
+            return false;
+
         Set<ByteBuffer> indexedColumns = Table.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.getIndexedColumns();
         AbstractType nameValidator =  ColumnFamily.getComparatorFor(metadata.ksName, metadata.cfName, null);
 
         boolean isIndexed = false;
-        for (IndexExpression expression : index_clause.expressions)
+        for (IndexExpression expression : index_clause)
         {
             try
             {
@@ -553,8 +585,7 @@ public class ThriftValidation
             isIndexed |= (expression.op == IndexOperator.EQ) && indexedColumns.contains(expression.column_name);
         }
 
-        if (!isIndexed)
-            throw new InvalidRequestException("No indexed columns present in index clause with operator EQ");
+        return isIndexed;
     }
 
     public static void validateCfDef(CfDef cf_def, CFMetaData old) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/test/system/test_thrift_server.py
----------------------------------------------------------------------
diff --git a/test/system/test_thrift_server.py b/test/system/test_thrift_server.py
index ab8e1bd..fdc8736 100644
--- a/test/system/test_thrift_server.py
+++ b/test/system/test_thrift_server.py
@@ -213,8 +213,8 @@ def _expect_exception(fn, type_):
 def _expect_missing(fn):
     _expect_exception(fn, NotFoundException)
 
-def get_range_slice(client, parent, predicate, start, end, count, cl):
-    kr = KeyRange(start, end, count=count)
+def get_range_slice(client, parent, predicate, start, end, count, cl, row_filter=None):
+    kr = KeyRange(start, end, count=count, row_filter=row_filter)
     return client.get_range_slices(parent, predicate, kr, cl)
     
 
@@ -977,7 +977,7 @@ class TestMutations(ThriftTester):
             client.insert(key, ColumnParent('Standard1'), Column(key, 'v', 0), ConsistencyLevel.ONE)
 
         def check_slices_against_keys(keyList, sliceList):
-            assert len(keyList) == len(sliceList)
+            assert len(keyList) == len(sliceList), "%d vs %d" % (len(keyList), len(sliceList))
             for key, ks in zip(keyList, sliceList):
                 assert key == ks.key
         
@@ -1484,11 +1484,14 @@ class TestMutations(ThriftTester):
         client.insert('key3', ColumnParent('ToBeIndexed'), Column('birthdate', _i64(3), 0), ConsistencyLevel.ONE)
         client.insert('key3', ColumnParent('ToBeIndexed'), Column('b', _i64(3), 0), ConsistencyLevel.ONE)
 
-        # Should fail without index
+        # First without index
         cp = ColumnParent('ToBeIndexed')
         sp = SlicePredicate(slice_range=SliceRange('', ''))
-        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
-        _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+        clause = FilterClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))])
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
+        assert len(result) == 1, result
+        assert result[0].key == 'key1'
+        assert len(result[0].columns) == 1, result[0].columns
 
         # add an index on 'birthdate'
         ks1 = client.describe_keyspace('Keyspace1')
@@ -1507,11 +1510,8 @@ class TestMutations(ThriftTester):
         # sleep a bit to give time for the index to build.
         time.sleep(0.5)
         
-        # simple query on one index expression
-        cp = ColumnParent('ToBeIndexed')
-        sp = SlicePredicate(slice_range=SliceRange('', ''))
-        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
-        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+        # repeat query on one index expression
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
         assert len(result) == 1, result
         assert result[0].key == 'key1'
         assert len(result[0].columns) == 1, result[0].columns
@@ -1857,21 +1857,21 @@ class TestMutations(ThriftTester):
         # simple query on one index expression
         cp = ColumnParent('Indexed1')
         sp = SlicePredicate(slice_range=SliceRange('', ''))
-        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
-        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+        clause = FilterClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))])
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
         assert len(result) == 1, result
         assert result[0].key == 'key1'
         assert len(result[0].columns) == 1, result[0].columns
 
-        # solo unindexed expression is invalid
-        clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(1))], '')
-        _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+        # without index
+        clause = FilterClause([IndexExpression('b', IndexOperator.EQ, _i64(1))])
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
+        assert len(result) == 0, result
 
         # but unindexed expression added to indexed one is ok
-        clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(3)),
-                              IndexExpression('birthdate', IndexOperator.EQ, _i64(3))],
-                             '')
-        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+        clause = FilterClause([IndexExpression('b', IndexOperator.EQ, _i64(3)),
+                               IndexExpression('birthdate', IndexOperator.EQ, _i64(3))])
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
         assert len(result) == 1, result
         assert result[0].key == 'key3'
         assert len(result[0].columns) == 2, result[0].columns
@@ -1885,20 +1885,20 @@ class TestMutations(ThriftTester):
         client.insert('key1', ColumnParent('Indexed3'), Column(u, 'a', 0), ConsistencyLevel.ONE)
         client.insert('key1', ColumnParent('Indexed3'), Column(u2, 'b', 0), ConsistencyLevel.ONE)
         # name comparator + data validator of incompatible types -- see CASSANDRA-2347
-        clause = IndexClause([IndexExpression(u, IndexOperator.EQ, 'a'),
-                              IndexExpression(u2, IndexOperator.EQ, 'b')], '')
-        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+        clause = FilterClause([IndexExpression(u, IndexOperator.EQ, 'a'),
+                              IndexExpression(u2, IndexOperator.EQ, 'b')])
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
         assert len(result) == 1, result
 
         cp = ColumnParent('Indexed2') # timeuuid name, long values
 
         # name must be valid (TimeUUID)
-        clause = IndexClause([IndexExpression('foo', IndexOperator.EQ, uuid.UUID('00000000-0000-1000-0000-000000000000').bytes)], '')
-        _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+        clause = FilterClause([IndexExpression('foo', IndexOperator.EQ, uuid.UUID('00000000-0000-1000-0000-000000000000').bytes)])
+        _expect_exception(lambda: get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause), InvalidRequestException)
         
         # value must be valid (TimeUUID)
-        clause = IndexClause([IndexExpression(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, IndexOperator.EQ, "foo")], '')
-        _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+        clause = FilterClause([IndexExpression(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, IndexOperator.EQ, "foo")])
+        _expect_exception(lambda: get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause), InvalidRequestException)
         
     def test_index_scan_expiring(self):
         """ Test that column ttled expires from KEYS index"""
@@ -1906,13 +1906,13 @@ class TestMutations(ThriftTester):
         client.insert('key1', ColumnParent('Indexed1'), Column('birthdate', _i64(1), 0, 1), ConsistencyLevel.ONE)
         cp = ColumnParent('Indexed1')
         sp = SlicePredicate(slice_range=SliceRange('', ''))
-        clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
+        clause = FilterClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))])
         # query before expiration
-        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
         assert len(result) == 1, result
         # wait for expiration and requery
         time.sleep(2)
-        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+        result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
         assert len(result) == 0, result
      
     def test_column_not_found_quorum(self): 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index c80486e..0248869 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -133,7 +133,8 @@ public class Util
         return cfs.getRangeSlice(superColumn,
                                  new Bounds<Token>(min, min).toRowBounds(),
                                  10000,
-                                 new IdentityQueryFilter());
+                                 new IdentityQueryFilter(),
+                                 null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 7edab90..b1b34ee 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -44,7 +44,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -81,7 +80,7 @@ public class CleanupTest extends CleanupHelper
         // record max timestamps of the sstables pre-cleanup
         List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
 
-        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        rows = Util.getRangeSlice(cfs);
         assertEquals(LOOPS, rows.size());
 
         // with one token in the ring, owned by the local node, cleanup should be a no-op
@@ -91,7 +90,7 @@ public class CleanupTest extends CleanupHelper
         assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
 
         // check data is still there
-        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        rows = Util.getRangeSlice(cfs);
         assertEquals(LOOPS, rows.size());
     }
 
@@ -106,7 +105,7 @@ public class CleanupTest extends CleanupHelper
 
         // insert data and verify we get it back w/ range query
         fillCF(cfs, LOOPS);
-        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        rows = Util.getRangeSlice(cfs);
         assertEquals(LOOPS, rows.size());
 
         SecondaryIndex index = cfs.indexManager.getIndexForColumn(COLUMN);
@@ -116,11 +115,11 @@ public class CleanupTest extends CleanupHelper
 
         // verify we get it back w/ index query too
         IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, VALUE);
-        IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, Integer.MAX_VALUE);
+        List<IndexExpression> clause = Arrays.asList(expr);
         IFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        rows = table.getColumnFamilyStore(CF1).search(clause, range, filter);
+        rows = table.getColumnFamilyStore(CF1).search(clause, range, Integer.MAX_VALUE, filter);
         assertEquals(LOOPS, rows.size());
 
         // we don't allow cleanup when the local host has no range to avoid wipping up all data when a node has not join the ring.
@@ -135,14 +134,14 @@ public class CleanupTest extends CleanupHelper
         CompactionManager.instance.performCleanup(cfs, new NodeId.OneShotRenewer());
 
         // row data should be gone
-        rows = cfs.getRangeSlice(null, range, 1000, new IdentityQueryFilter());
+        rows = Util.getRangeSlice(cfs);
         assertEquals(0, rows.size());
 
         // not only should it be gone but there should be no data on disk, not even tombstones
         assert cfs.getSSTables().isEmpty();
 
         // 2ary indexes should result in no results, too (although tombstones won't be gone until compacted)
-        rows = cfs.search(clause, range, filter);
+        rows = cfs.search(clause, range, Integer.MAX_VALUE, filter);
         assertEquals(0, rows.size());
     }