You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/01/02 17:08:49 UTC
[3/4] Merge get_indexed_slices with get_range_slices
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 15478eb..940e00a 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -171,9 +171,21 @@ public class QueryProcessor
AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(startKey, finishKey);
// XXX: Our use of Thrift structs internally makes me Sad. :(
- SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata,variables);
+ SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata, variables);
validateSlicePredicate(metadata, thriftSlicePredicate);
+ List<IndexExpression> expressions = new ArrayList<IndexExpression>();
+ for (Relation columnRelation : select.getColumnRelations())
+ {
+ // Left and right side of relational expression encoded according to comparator/validator.
+ ByteBuffer entity = columnRelation.getEntity().getByteBuffer(metadata.comparator, variables);
+ ByteBuffer value = columnRelation.getValue().getByteBuffer(select.getValueValidator(metadata.ksName, entity), variables);
+
+ expressions.add(new IndexExpression(entity,
+ IndexOperator.valueOf(columnRelation.operator().toString()),
+ value));
+ }
+
int limit = select.isKeyRange() && select.getKeyStart() != null
? select.getNumRecords() + 1
: select.getNumRecords();
@@ -185,6 +197,7 @@ public class QueryProcessor
null,
thriftSlicePredicate,
bounds,
+ expressions,
limit),
select.getConsistencyLevel());
}
@@ -218,51 +231,7 @@ public class QueryProcessor
return rows.subList(0, select.getNumRecords() < rows.size() ? select.getNumRecords() : rows.size());
}
-
- private static List<org.apache.cassandra.db.Row> getIndexedSlices(CFMetaData metadata, SelectStatement select, List<String> variables)
- throws TimedOutException, UnavailableException, InvalidRequestException
- {
- // XXX: Our use of Thrift structs internally (still) makes me Sad. :~(
- SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata, variables);
- validateSlicePredicate(metadata, thriftSlicePredicate);
-
- List<IndexExpression> expressions = new ArrayList<IndexExpression>();
- for (Relation columnRelation : select.getColumnRelations())
- {
- // Left and right side of relational expression encoded according to comparator/validator.
- ByteBuffer entity = columnRelation.getEntity().getByteBuffer(metadata.comparator, variables);
- ByteBuffer value = columnRelation.getValue().getByteBuffer(select.getValueValidator(metadata.ksName, entity), variables);
-
- expressions.add(new IndexExpression(entity,
- IndexOperator.valueOf(columnRelation.operator().toString()),
- value));
- }
- AbstractType<?> keyType = Schema.instance.getCFMetaData(metadata.ksName, select.getColumnFamily()).getKeyValidator();
- ByteBuffer startKey = (!select.isKeyRange()) ? (new Term()).getByteBuffer() : select.getKeyStart().getByteBuffer(keyType, variables);
- IndexClause thriftIndexClause = new IndexClause(expressions, startKey, select.getNumRecords());
-
- List<org.apache.cassandra.db.Row> rows;
- try
- {
- rows = StorageProxy.scan(metadata.ksName,
- select.getColumnFamily(),
- thriftIndexClause,
- thriftSlicePredicate,
- select.getConsistencyLevel());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- catch (TimeoutException e)
- {
- throw new TimedOutException();
- }
-
- return rows;
- }
-
private static void batchUpdate(ClientState clientState, List<UpdateStatement> updateStatements, ConsistencyLevel consistency, List<String> variables )
throws InvalidRequestException, UnavailableException, TimedOutException
{
@@ -544,16 +513,7 @@ public class QueryProcessor
}
else
{
- // Range query
- if ((select.getKeyFinish() != null) || (select.getColumnRelations().size() == 0))
- {
- rows = multiRangeSlice(metadata, select, variables);
- }
- // Index scan
- else
- {
- rows = getIndexedSlices(metadata, select, variables);
- }
+ rows = multiRangeSlice(metadata, select, variables);
}
// count resultset is a single column named "count"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 907ccb4..706f43b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,10 +30,10 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import javax.management.*;
+import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.compaction.LeveledManifest;
import org.apache.cassandra.service.CacheService;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +48,8 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
@@ -59,7 +61,7 @@ import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.IntervalTree.Interval;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -1265,87 +1267,139 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return columns;
}
+ public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row> {}
+
/**
- * Fetch a range of rows and columns from memtables/sstables.
+ * Iterate over a range of rows and columns from memtables/sstables.
*
* @param superColumn optional SuperColumn to slice subcolumns of; null to slice top-level columns
* @param range Either a Bounds, which includes start key, or a Range, which does not.
- * @param maxResults Maximum rows to return
* @param columnFilter description of the columns we're interested in for each row
- * @return true if we found all keys we were looking for, otherwise false
*/
- public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter)
+ public AbstractScanIterator getSequentialIterator(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, IFilter columnFilter)
{
assert range instanceof Bounds
|| !((Range)range).isWrapAround() || range.right.isMinimum()
: range;
- RowPosition startWith = range.left;
- RowPosition stopAt = range.right;
+ final RowPosition startWith = range.left;
+ final RowPosition stopAt = range.right;
QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter);
- int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
List<Row> rows;
- ViewFragment view = markReferenced(startWith, stopAt);
+ final ViewFragment view = markReferenced(startWith, stopAt);
try
{
- CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
- rows = new ArrayList<Row>();
+ final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
+ final int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
- try
+ return new AbstractScanIterator()
{
- // pull rows out of the iterator
boolean first = true;
- while (iterator.hasNext())
+
+ protected Row computeNext()
{
+ // pull a row out of the iterator
+ if (!iterator.hasNext())
+ return endOfData();
+
Row current = iterator.next();
DecoratedKey key = current.key;
if (!stopAt.isMinimum() && stopAt.compareTo(key) < 0)
- return rows;
+ return endOfData();
// skip first one
if (range instanceof Bounds || !first || !key.equals(startWith))
{
- // TODO this is necessary because when we collate supercolumns together, we don't check
- // their subcolumns for relevance, so we need to do a second prune post facto here.
- rows.add(current.cf != null && current.cf.isSuper()
- ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore))
- : current);
if (logger.isDebugEnabled())
logger.debug("scanned " + key);
+ // TODO this is necessary because when we collate supercolumns together, we don't check
+ // their subcolumns for relevance, so we need to do a second prune post facto here.
+ return current.cf != null && current.cf.isSuper()
+ ? new Row(current.key, removeDeleted(current.cf, gcBefore))
+ : current;
}
first = false;
- if (rows.size() >= maxResults)
- return rows;
+ return computeNext();
}
- }
- finally
- {
- try
- {
- iterator.close();
- }
- catch (IOException e)
+
+ public void close() throws IOException
{
- throw new IOError(e);
+ SSTableReader.releaseReferences(view.sstables);
+ try
+ {
+ iterator.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
- }
+ };
}
- finally
+ catch (RuntimeException e)
{
- // separate finally block to release references in case getIterator() throws
+ // In case getIterator() throws, otherwise the iteror close method releases the references.
SSTableReader.releaseReferences(view.sstables);
+ throw e;
}
-
- return rows;
}
- public List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter)
- {
- return indexManager.search(clause, range, dataFilter);
+ public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IFilter columnFilter, List<IndexExpression> rowFilter)
+ {
+ return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults));
+ }
+
+ public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
+ {
+ return indexManager.search(clause, range, maxResults, dataFilter);
+ }
+
+ public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
+ {
+ List<Row> rows = new ArrayList<Row>();
+ try
+ {
+ while (rowIterator.hasNext() && rows.size() < filter.maxResults)
+ {
+ // get the raw columns requested, and additional columns for the expressions if necessary
+ Row rawRow = rowIterator.next();
+ ColumnFamily data = rawRow.cf;
+
+ // roughtly
+ IFilter extraFilter = filter.getExtraFilter(data);
+ if (extraFilter != null)
+ {
+ QueryPath path = new QueryPath(columnFamily);
+ ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, path, extraFilter));
+ if (cf != null)
+ data.addAll(cf, HeapAllocator.instance);
+ }
+
+ if (!filter.isSatisfiedBy(data))
+ continue;
+
+ logger.debug("{} satisfies all filter expressions", data);
+ // cut the resultset back to what was requested, if necessary
+ data = filter.prune(data);
+ rows.add(new Row(rawRow.key, data));
+ }
+ return rows;
+ }
+ finally
+ {
+ try
+ {
+ rowIterator.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
}
public AbstractType getComparator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 5df816d..272d415 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -348,7 +348,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
IPartitioner p = StorageService.getPartitioner();
Range range = new Range(p.getMinimumToken(), p.getMinimumToken(), p);
IFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
- List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter);
+ List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter, null);
for (Row row : rows)
{
Token<?> token = StorageService.getPartitioner().getTokenFactory().fromByteArray(row.key.key);
@@ -434,7 +434,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
List<Row> rows;
try
{
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index cc200ae..4ab821e 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -39,6 +39,8 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
@@ -47,15 +49,17 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
-import org.apache.cassandra.thrift.TBinaryProtocol;
public class RangeSliceCommand implements MessageProducer, IReadCommand
{
@@ -67,16 +71,22 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand
public final ByteBuffer super_column;
public final SlicePredicate predicate;
+ public final List<IndexExpression> row_filter;
public final AbstractBounds<RowPosition> range;
public final int max_keys;
- public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds<RowPosition> range, int max_keys)
+ public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int max_keys)
{
- this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, max_keys);
+ this(keyspace, column_family, super_column, predicate, range, null, max_keys);
}
- public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, int max_keys)
+ public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int max_keys)
+ {
+ this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, row_filter, max_keys);
+ }
+
+ public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, SlicePredicate predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int max_keys)
{
this.keyspace = keyspace;
this.column_family = column_family;
@@ -84,6 +94,7 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand
this.predicate = predicate;
this.range = range;
this.max_keys = max_keys;
+ this.row_filter = row_filter;
}
public Message getMessage(Integer version) throws IOException
@@ -105,6 +116,7 @@ public class RangeSliceCommand implements MessageProducer, IReadCommand
", predicate=" + predicate +
", range=" + range +
", max_keys=" + max_keys +
+ ", row_filter =" + row_filter +
'}';
}
@@ -134,6 +146,20 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
FBUtilities.serialize(ser, sliceCommand.predicate, dos);
+
+ if (version >= MessagingService.VERSION_11)
+ {
+ if (sliceCommand.row_filter == null)
+ {
+ dos.writeInt(0);
+ }
+ else
+ {
+ dos.writeInt(sliceCommand.row_filter.size());
+ for (IndexExpression expr : sliceCommand.row_filter)
+ FBUtilities.serialize(ser, expr, dos);
+ }
+ }
AbstractBounds.serializer().serialize(sliceCommand.range, dos, version);
dos.writeInt(sliceCommand.max_keys);
}
@@ -141,24 +167,37 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
public RangeSliceCommand deserialize(DataInput dis, int version) throws IOException
{
String keyspace = dis.readUTF();
- String column_family = dis.readUTF();
+ String columnFamily = dis.readUTF();
int scLength = dis.readInt();
- ByteBuffer super_column = null;
+ ByteBuffer superColumn = null;
if (scLength > 0)
{
byte[] buf = new byte[scLength];
dis.readFully(buf);
- super_column = ByteBuffer.wrap(buf);
+ superColumn = ByteBuffer.wrap(buf);
}
TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
SlicePredicate pred = new SlicePredicate();
FBUtilities.deserialize(dser, pred, dis);
+
+ List<IndexExpression> rowFilter = null;
+ if (version >= MessagingService.VERSION_11)
+ {
+ int filterCount = dis.readInt();
+ rowFilter = new ArrayList<IndexExpression>(filterCount);
+ for (int i = 0; i < filterCount; i++)
+ {
+ IndexExpression expr = new IndexExpression();
+ FBUtilities.deserialize(dser, expr, dis);
+ rowFilter.add(expr);
+ }
+ }
AbstractBounds<RowPosition> range = AbstractBounds.serializer().deserialize(dis, version).toRowBounds();
- int max_keys = dis.readInt();
- return new RangeSliceCommand(keyspace, column_family, super_column, pred, range, max_keys);
+ int maxKeys = dis.readInt();
+ return new RangeSliceCommand(keyspace, columnFamily, superColumn, pred, range, rowFilter, maxKeys);
}
public long serializedSize(RangeSliceCommand rangeSliceCommand, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
new file mode 100644
index 0000000..bf56e99
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -0,0 +1,250 @@
+package org.apache.cassandra.db.filter;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Extends a column filter (IFilter) to include a number of IndexExpression.
+ */
+public abstract class ExtendedFilter
+{
+ private static Logger logger = LoggerFactory.getLogger(ExtendedFilter.class);
+
+ public final ColumnFamilyStore cfs;
+ public final int maxResults;
+ protected final IFilter originalFilter;
+
+ public static ExtendedFilter create(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults)
+ {
+ if (clause == null || clause.isEmpty())
+ return new EmptyClauseFilter(cfs, filter, maxResults);
+ else
+ return new FilterWithClauses(cfs, filter, clause, maxResults);
+ }
+
+ protected ExtendedFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults)
+ {
+ assert cfs != null;
+ assert filter != null;
+ this.cfs = cfs;
+ this.originalFilter = filter;
+ this.maxResults = maxResults;
+ }
+
+ /** The initial filter we'll do our first slice with (either the original or a superset of it) */
+ public abstract IFilter initialFilter();
+
+ public abstract List<IndexExpression> getClause();
+
+ /**
+ * Returns a filter to query the columns from the clause that the initial slice filter may not have caught.
+ * @param data the data retrieve by the initial filter
+ * @return a filter or null if there can't be any columns we missed with our initial filter (typically if it was a names query, or a slice of the entire row)
+ */
+ public abstract IFilter getExtraFilter(ColumnFamily data);
+
+ /**
+ * @return data pruned down to the columns originally asked for
+ */
+ public abstract ColumnFamily prune(ColumnFamily data);
+
+ /**
+ * @return true if the provided data satisfies all the expressions from
+ * the clause of this filter.
+ */
+ public abstract boolean isSatisfiedBy(ColumnFamily data);
+
+ public static boolean satisfies(int comparison, IndexOperator op)
+ {
+ switch (op)
+ {
+ case EQ:
+ return comparison == 0;
+ case GTE:
+ return comparison >= 0;
+ case GT:
+ return comparison > 0;
+ case LTE:
+ return comparison <= 0;
+ case LT:
+ return comparison < 0;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private static class FilterWithClauses extends ExtendedFilter
+ {
+ protected final List<IndexExpression> clause;
+ protected final IFilter initialFilter;
+
+ public FilterWithClauses(ColumnFamilyStore cfs, IFilter filter, List<IndexExpression> clause, int maxResults)
+ {
+ super(cfs, filter, maxResults);
+ assert clause != null;
+ this.clause = clause;
+ this.initialFilter = computeInitialFilter();
+ }
+
+ /** Sets up the initial filter. */
+ private IFilter computeInitialFilter()
+ {
+ if (originalFilter instanceof SliceQueryFilter)
+ {
+ // if we have a high chance of getting all the columns in a single index slice (and it's not too costly), do that.
+ // otherwise, the extraFilter (lazily created) will fetch by name the columns referenced by the additional expressions.
+ if (cfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
+ {
+ logger.debug("Expanding slice filter to entire row to cover additional expressions");
+ return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ((SliceQueryFilter) originalFilter).reversed,
+ Integer.MAX_VALUE);
+ }
+ }
+ else
+ {
+ logger.debug("adding columns to original Filter to cover additional expressions");
+ assert originalFilter instanceof NamesQueryFilter;
+ SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+ for (IndexExpression expr : clause)
+ {
+ columns.add(expr.column_name);
+ }
+ if (columns.size() > 0)
+ {
+ columns.addAll(((NamesQueryFilter) originalFilter).columns);
+ return new NamesQueryFilter(columns);
+ }
+ }
+ return originalFilter;
+ }
+
+ public IFilter initialFilter()
+ {
+ return initialFilter;
+ }
+
+ public List<IndexExpression> getClause()
+ {
+ return clause;
+ }
+
+ /*
+ * We may need an extra query only if the original was a slice query (and thus may have miss the expression for the clause).
+ * Even then, there is no point in doing an extra query if the original filter grabbed the whole row.
+ * Lastly, we only need the extra query if we haven't yet got all the expressions from the clause.
+ */
+ private boolean needsExtraQuery(ColumnFamily data)
+ {
+ if (!(originalFilter instanceof SliceQueryFilter))
+ return false;
+
+ SliceQueryFilter filter = (SliceQueryFilter)originalFilter;
+ // Check if we've fetch the whole row
+ if (filter.start.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ && filter.finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ && filter.count == Integer.MAX_VALUE)
+ return false;
+
+ for (IndexExpression expr : clause)
+ {
+ if (data.getColumn(expr.column_name) == null)
+ {
+ logger.debug("adding extraFilter to cover additional expressions");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public IFilter getExtraFilter(ColumnFamily data)
+ {
+ if (!needsExtraQuery(data))
+ return null;
+
+ // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
+ // why we do the dance of avoiding to query any column we already have (it's also more efficient anyway)
+ SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+ for (IndexExpression expr : clause)
+ {
+ if (data.getColumn(expr.column_name) == null)
+ columns.add(expr.column_name);
+ }
+ assert !columns.isEmpty();
+ return new NamesQueryFilter(columns);
+ }
+
+ public ColumnFamily prune(ColumnFamily data)
+ {
+ if (initialFilter == originalFilter)
+ return data;
+ ColumnFamily pruned = data.cloneMeShallow();
+ IColumnIterator iter = originalFilter.getMemtableColumnIterator(data, null);
+ originalFilter.collectReducedColumns(pruned, iter, cfs.gcBefore());
+ return pruned;
+ }
+
+ public boolean isSatisfiedBy(ColumnFamily data)
+ {
+ // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
+ // where the index returned a row which doesn't have the primary column when we actually read it
+ for (IndexExpression expression : clause)
+ {
+ // check column data vs expression
+ IColumn column = data.getColumn(expression.column_name);
+ if (column == null)
+ return false;
+ int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
+ if (!satisfies(v, expression.op))
+ return false;
+ }
+ return true;
+ }
+ }
+
+ private static class EmptyClauseFilter extends ExtendedFilter
+ {
+ public EmptyClauseFilter(ColumnFamilyStore cfs, IFilter filter, int maxResults)
+ {
+ super(cfs, filter, maxResults);
+ }
+
+ public IFilter initialFilter()
+ {
+ return originalFilter;
+ }
+
+ public List<IndexExpression> getClause()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public IFilter getExtraFilter(ColumnFamily data)
+ {
+ return null;
+ }
+
+ public ColumnFamily prune(ColumnFamily data)
+ {
+ return data;
+ }
+
+ public boolean isSatisfiedBy(ColumnFamily data)
+ {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index c2d2318..79feb96 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -70,7 +70,6 @@ public abstract class SecondaryIndex
*/
public abstract void validateOptions() throws ConfigurationException;
-
/**
* @return The name of the index
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 891313c..01b795f 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.LocalToken;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.lang.StringUtils;
@@ -143,6 +142,26 @@ public class SecondaryIndexManager
}
/**
+ * @return true if the indexes can handle the clause.
+ */
+ public boolean hasIndexFor(List<IndexExpression> clause)
+ {
+ if (clause == null || clause.isEmpty())
+ return false;
+
+ // It doesn't seem a clause can have multiple searchers, but since
+ // getIndexSearchersForQuery returns a list ...
+ List<SecondaryIndexSearcher> searchers = getIndexSearchersForQuery(clause);
+ if (searchers.isEmpty())
+ return false;
+
+ for (SecondaryIndexSearcher searcher : searchers)
+ if (!searcher.isIndexing(clause))
+ return false;
+ return true;
+ }
+
+ /**
* Removes a existing index
* @param column the indexed column to remove
* @throws IOException
@@ -485,9 +504,9 @@ public class SecondaryIndexManager
/**
* Get a list of IndexSearchers from the union of expression index types
* @param clause the query clause
- * @return the searchers to needed to query the index
+ * @return the searchers needed to query the index
*/
- private List<SecondaryIndexSearcher> getIndexSearchersForQuery(IndexClause clause)
+ private List<SecondaryIndexSearcher> getIndexSearchersForQuery(List<IndexExpression> clause)
{
List<SecondaryIndexSearcher> indexSearchers = new ArrayList<SecondaryIndexSearcher>();
@@ -495,7 +514,7 @@ public class SecondaryIndexManager
//Group columns by type
- for (IndexExpression ix : clause.expressions)
+ for (IndexExpression ix : clause)
{
SecondaryIndex index = getIndexForColumn(ix.column_name);
@@ -531,7 +550,7 @@ public class SecondaryIndexManager
* @param dataFilter the column range to restrict to
* @return found indexed rows
*/
- public List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter)
+ public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
{
List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
@@ -543,6 +562,6 @@ public class SecondaryIndexManager
throw new RuntimeException("Unable to search across multiple secondary index types");
- return indexSearchers.get(0).search(clause, range, dataFilter);
+ return indexSearchers.get(0).search(clause, range, maxResults, dataFilter);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 4fe9f2a..6365c81 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
@@ -40,52 +39,11 @@ public abstract class SecondaryIndexSearcher
this.columns = columns;
this.baseCfs = indexManager.baseCfs;
}
-
- public static boolean satisfies(ColumnFamily data, IndexClause clause, IndexExpression first)
- {
- // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
- // where the index returned a row which doesn't have the primarycolumn when we actually read it
- for (IndexExpression expression : clause.expressions)
- {
- // check column data vs expression
- IColumn column = data.getColumn(expression.column_name);
- if (column == null)
- return false;
- int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
- if (!satisfies(v, expression.op))
- return false;
- }
- return true;
- }
- public static boolean satisfies(int comparison, IndexOperator op)
- {
- switch (op)
- {
- case EQ:
- return comparison == 0;
- case GTE:
- return comparison >= 0;
- case GT:
- return comparison > 0;
- case LTE:
- return comparison <= 0;
- case LT:
- return comparison < 0;
- default:
- throw new IllegalStateException();
- }
- }
-
- public NamesQueryFilter getExtraFilter(IndexClause clause)
- {
- SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(baseCfs.getComparator());
- for (IndexExpression expr : clause.expressions)
- {
- columns.add(expr.column_name);
- }
- return new NamesQueryFilter(columns);
- }
-
- public abstract List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter);
+ public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter);
+
+ /**
+ * @return true this index is able to handle given clauses.
+ */
+ public abstract boolean isIndexing(List<IndexExpression> clause);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index c322a9a..4f975eb 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.index.keys;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -28,7 +29,7 @@ import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,11 +46,11 @@ public class KeysSearcher extends SecondaryIndexSearcher
super(indexManager, columns);
}
- private IndexExpression highestSelectivityPredicate(IndexClause clause)
+ private IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
{
IndexExpression best = null;
int bestMeanCount = Integer.MAX_VALUE;
- for (IndexExpression expression : clause.expressions)
+ for (IndexExpression expression : clause)
{
//skip columns belonging to a different index type
if(!columns.contains(expression.column_name))
@@ -77,181 +78,134 @@ public class KeysSearcher extends SecondaryIndexSearcher
baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
}
- private static boolean isIdentityFilter(SliceQueryFilter filter)
+ public boolean isIndexing(List<IndexExpression> clause)
{
- return filter.start.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- && filter.finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- && filter.count == Integer.MAX_VALUE;
+ return highestSelectivityPredicate(clause) != null;
}
-
+
@Override
- public List<Row> search(IndexClause clause, AbstractBounds<RowPosition> range, IFilter dataFilter)
+ public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IFilter dataFilter)
+ {
+ assert clause != null && !clause.isEmpty();
+ ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults);
+ return baseCfs.filter(getIndexedIterator(range, filter), filter);
+ }
+
+ public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
// TODO: allow merge join instead of just one index + loop
- IndexExpression primary = highestSelectivityPredicate(clause);
- SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
+ final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
+ final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
if (logger.isDebugEnabled())
logger.debug("Primary scan clause is " + baseCfs.getComparator().getString(primary.column_name));
assert index != null;
- DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value);
-
- // if the slicepredicate doesn't contain all the columns for which we have expressions to evaluate,
- // it needs to be expanded to include those too
- IFilter firstFilter = dataFilter;
- if (dataFilter instanceof SliceQueryFilter)
- {
- // if we have a high chance of getting all the columns in a single index slice, do that.
- // otherwise, we'll create an extraFilter (lazily) to fetch by name the columns referenced by the additional expressions.
- if (baseCfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
- {
- logger.debug("Expanding slice filter to entire row to cover additional expressions");
- firstFilter = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ((SliceQueryFilter) dataFilter).reversed,
- Integer.MAX_VALUE);
- }
- }
- else
- {
- logger.debug("adding columns to firstFilter to cover additional expressions");
- // just add in columns that are not part of the resultset
- assert dataFilter instanceof NamesQueryFilter;
- SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(baseCfs.getComparator());
- for (IndexExpression expr : clause.expressions)
- {
- columns.add(expr.column_name);
- }
- if (columns.size() > 0)
- {
- columns.addAll(((NamesQueryFilter) dataFilter).columns);
- firstFilter = new NamesQueryFilter(columns);
- }
- }
-
- List<Row> rows = new ArrayList<Row>();
- ByteBuffer startKey = clause.start_key;
- QueryPath path = new QueryPath(baseCfs.columnFamily);
-
- // we need to store last data key accessed to avoid duplicate results
- // because in the while loop new iteration we can access the same column if start_key was not set
- ByteBuffer lastDataKey = null;
-
- // fetch row keys matching the primary expression, fetch the slice predicate for each
- // and filter by remaining expressions. repeat until finished w/ assigned range or index row is exhausted.
- outer:
- while (true)
+ final DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value);
+
+ /*
+ * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+ * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
+ * possible key having a given token. A fix would be to actually store the token along the key in the
+ * indexed row.
+ */
+ final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+ return new ColumnFamilyStore.AbstractScanIterator()
{
- /* we don't have a way to get the key back from the DK -- we just have a token --
- * so, we need to loop after starting with start_key, until we get to keys in the given `range`.
- * But, if the calling StorageProxy is doing a good job estimating data from each range, the range
- * should be pretty close to `start_key`. */
- if (logger.isDebugEnabled())
- logger.debug(String.format("Scanning index %s starting with %s",
- expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
-
- // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
- int count = Math.max(clause.count, 2);
- QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
- new QueryPath(index.getIndexCfs().getColumnFamilyName()),
- startKey,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- count);
- ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
- logger.debug("fetched {}", indexRow);
- if (indexRow == null)
- break;
+ private ByteBuffer lastSeenKey = startKey;
+ private Iterator<IColumn> indexColumns;
+ private final QueryPath path = new QueryPath(baseCfs.columnFamily);
+ private int columnsRead = Integer.MAX_VALUE;
- ByteBuffer dataKey = null;
- int n = 0;
- for (IColumn column : indexRow.getSortedColumns())
+ protected Row computeNext()
{
- if (column.isMarkedForDelete())
- {
- logger.debug("skipping {}",column.name());
- continue;
- }
-
- dataKey = column.name();
- n++;
-
- if(logger.isDebugEnabled())
- logger.debug("fetching {}",column.name());
-
- DecoratedKey dk = baseCfs.partitioner.decorateKey(dataKey);
- if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
- break outer;
- if (!range.contains(dk) || dataKey.equals(lastDataKey))
- continue;
-
- // get the row columns requested, and additional columns for the expressions if necessary
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, firstFilter));
- // While we the column family we'll get in the end should contains the primary clause column, the firstFilter may not have found it.
- if (data == null)
- data = ColumnFamily.create(baseCfs.metadata);
- logger.debug("fetched data row {}", data);
- NamesQueryFilter extraFilter = null;
- if (dataFilter instanceof SliceQueryFilter && !isIdentityFilter((SliceQueryFilter)dataFilter))
+ while (true)
{
- // we might have gotten the expression columns in with the main data slice, but
- // we can't know for sure until that slice is done. So, we'll do the extra query
- // if we go through and any expression columns are not present.
- boolean needExtraFilter = false;
- for (IndexExpression expr : clause.expressions)
+ if (indexColumns == null || !indexColumns.hasNext())
{
- if (data.getColumn(expr.column_name) == null)
+ if (columnsRead < filter.maxResults)
{
- logger.debug("adding extraFilter to cover additional expressions");
- // Lazily creating extra filter
- needExtraFilter = true;
- break;
+ logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, filter.maxResults);
+ return endOfData();
}
- }
- if (needExtraFilter)
- {
- // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
- // why we do the dance of avoiding to query any column we already have (it's also more efficient anyway)
- extraFilter = getExtraFilter(clause);
- for (IndexExpression expr : clause.expressions)
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Scanning index %s starting with %s",
+ expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
+
+ // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
+ int count = Math.max(filter.maxResults, 2);
+ QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+ new QueryPath(index.getIndexCfs().getColumnFamilyName()),
+ lastSeenKey,
+ endKey,
+ false,
+ count);
+ ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
+ logger.debug("fetched {}", indexRow);
+ if (indexRow == null)
{
- if (data.getColumn(expr.column_name) != null)
- extraFilter.columns.remove(expr.column_name);
+ logger.debug("no data, all done");
+ return endOfData();
}
- assert !extraFilter.columns.isEmpty();
- ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, path, extraFilter));
- if (cf != null)
- data.addAll(cf, HeapAllocator.instance);
- }
- }
+ Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+ columnsRead = sortedColumns.size();
+ indexColumns = sortedColumns.iterator();
+ IColumn firstColumn = sortedColumns.iterator().next();
- if (SecondaryIndexSearcher.satisfies(data, clause, primary))
- {
- logger.debug("row {} satisfies all clauses", data);
- // cut the resultset back to what was requested, if necessary
- if (firstFilter != dataFilter || extraFilter != null)
- {
- ColumnFamily expandedData = data;
- data = expandedData.cloneMeShallow();
- IColumnIterator iter = dataFilter.getMemtableColumnIterator(expandedData, dk);
- new QueryFilter(dk, path, dataFilter).collateColumns(data, Collections.singletonList(iter), baseCfs.gcBefore());
+ // Paging is racy, so it is possible the first column of a page is not the last seen one.
+ if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
+ {
+ // skip the row we already saw w/ the last page of results
+ indexColumns.next();
+ columnsRead--;
+ logger.debug("Skipping {}", baseCfs.getComparator().getString(firstColumn.name()));
+ }
+ else if (range instanceof Range && indexColumns.hasNext() && firstColumn.equals(startKey))
+ {
+ // skip key excluded by range
+ indexColumns.next();
+ columnsRead--;
+ logger.debug("Skipping first key as range excludes it");
+ }
}
- rows.add(new Row(dk, data));
- }
+ while (indexColumns.hasNext())
+ {
+ IColumn column = indexColumns.next();
+ lastSeenKey = column.name();
+ if (column.isMarkedForDelete())
+ {
+ logger.debug("skipping {}", column.name());
+ continue;
+ }
- if (rows.size() == clause.count)
- break outer;
- }
- if (n < clause.count || startKey.equals(dataKey))
- break;
+ DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
+ if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
+ {
+ logger.debug("Reached end of assigned scan range");
+ return endOfData();
+ }
+ if (!range.contains(dk))
+ {
+ logger.debug("Skipping entry {} outside of assigned scan range", dk.token);
+ continue;
+ }
- lastDataKey = startKey = dataKey;
- }
+ logger.debug("Returning index hit for {}", dk);
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
+ // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
+ if (data == null)
+ data = ColumnFamily.create(baseCfs.metadata);
+ return new Row(dk, data);
+ }
+ }
+ }
- return rows;
+ public void close() throws IOException {}
+ };
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 0ece843..066b2e4 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -39,7 +39,10 @@ public class IndexScanVerbHandler implements IVerbHandler
{
IndexScanCommand command = IndexScanCommand.read(message);
ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- List<Row> rows = cfs.indexManager.search(command.index_clause, command.range, QueryFilter.getFilter(command.predicate, cfs.getComparator()));
+ List<Row> rows = cfs.indexManager.search(command.index_clause.expressions,
+ command.range,
+ command.index_clause.count,
+ QueryFilter.getFilter(command.predicate, cfs.getComparator()));
RangeSliceReply reply = new RangeSliceReply(rows);
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index 971ccb3..2353b71 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -18,23 +18,38 @@
package org.apache.cassandra.service;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RangeSliceCommand;
import org.apache.cassandra.db.RangeSliceReply;
+import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
public class RangeSliceVerbHandler implements IVerbHandler
{
-
private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
+ static List<Row> executeLocally(RangeSliceCommand command) throws ExecutionException, InterruptedException
+ {
+ ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
+ IFilter columnFilter = QueryFilter.getFilter(command.predicate, cfs.getComparator());
+
+ if (cfs.indexManager.hasIndexFor(command.row_filter))
+ return cfs.search(command.row_filter, command.range, command.max_keys, columnFilter);
+ else
+ return cfs.getRangeSlice(command.super_column, command.range, command.max_keys, columnFilter, command.row_filter);
+ }
+
public void doVerb(Message message, String id)
{
try
@@ -46,10 +61,7 @@ public class RangeSliceVerbHandler implements IVerbHandler
}
RangeSliceCommand command = RangeSliceCommand.read(message);
ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- RangeSliceReply reply = new RangeSliceReply(cfs.getRangeSlice(command.super_column,
- command.range,
- command.max_keys,
- QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+ RangeSliceReply reply = new RangeSliceReply(executeLocally(command));
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 458b8ec..c4c69fd 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -830,32 +830,46 @@ public class StorageProxy implements StorageProxyMBean
List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
for (AbstractBounds<RowPosition> range : ranges)
{
- List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
+ RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
+ command.column_family,
+ command.super_column,
+ command.predicate,
+ range,
+ command.row_filter,
+ command.max_keys);
+
+ List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
{
if (logger.isDebugEnabled())
logger.debug("local range slice");
- ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- rows.addAll(cfs.getRangeSlice(command.super_column,
- range,
- command.max_keys,
- QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+
+ try
+ {
+ rows.addAll(RangeSliceVerbHandler.executeLocally(nodeCmd));
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
else
{
- RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
-
// collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, liveEndpoints);
+ ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
handler.assureSufficientLiveNodes();
for (InetAddress endpoint : handler.endpoints)
{
- MessagingService.instance().sendRR(c2, endpoint, handler);
+ MessagingService.instance().sendRR(nodeCmd, endpoint, handler);
if (logger.isDebugEnabled())
- logger.debug("reading " + c2 + " from " + endpoint);
+ logger.debug("reading " + nodeCmd + " from " + endpoint);
}
try
@@ -880,7 +894,7 @@ public class StorageProxy implements StorageProxyMBean
}
// if we're done, great, otherwise, move to the next range
- if (rows.size() >= command.max_keys)
+ if (rows.size() >= nodeCmd.max_keys)
break;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 1a71f04..4987617 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -668,7 +668,7 @@ public class CassandraServer implements Cassandra.Iface
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, predicate);
- ThriftValidation.validateKeyRange(range);
+ ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
List<Row> rows;
@@ -690,7 +690,7 @@ public class CassandraServer implements Cassandra.Iface
schedule(DatabaseDescriptor.getRpcTimeout());
try
{
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.row_filter, range.count), consistency_level);
}
finally
{
@@ -739,7 +739,11 @@ public class CassandraServer implements Cassandra.Iface
List<Row> rows;
try
{
- rows = StorageProxy.scan(keyspace, column_parent.column_family, index_clause, column_predicate, consistency_level);
+ rows = StorageProxy.scan(keyspace,
+ column_parent.column_family,
+ index_clause,
+ column_predicate,
+ consistency_level);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 76635d9..fd73c53 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -479,7 +479,7 @@ public class ThriftValidation
validateColumnNames(metadata, column_parent, predicate.column_names);
}
- public static void validateKeyRange(KeyRange range) throws InvalidRequestException
+ public static void validateKeyRange(CFMetaData metadata, ByteBuffer superColumn, KeyRange range) throws InvalidRequestException
{
if ((range.start_key == null) != (range.end_key == null))
{
@@ -508,22 +508,54 @@ public class ThriftValidation
}
}
+ validateFilterClauses(metadata, range.row_filter);
+
+ if (!isEmpty(range.row_filter) && superColumn != null)
+ {
+ throw new InvalidRequestException("super columns are not yet supported for indexing");
+ }
+ if (!isEmpty(range.row_filter) && range.start_key == null)
+ {
+ // TODO: our current KEYS indexes can't do that efficiently
+ // (without scanning *all* the keys in the range and simply applying the filter to discard them when they don't match)
+ // See KeySearcher.search()
+ throw new InvalidRequestException("filtered queries must use concrete keys rather than tokens");
+ }
+
if (range.count <= 0)
{
throw new InvalidRequestException("maxRows must be positive");
}
}
+ private static boolean isEmpty(List<IndexExpression> clause)
+ {
+ return clause == null || clause.isEmpty();
+ }
+
public static void validateIndexClauses(CFMetaData metadata, IndexClause index_clause)
throws InvalidRequestException
{
if (index_clause.expressions.isEmpty())
throw new InvalidRequestException("index clause list may not be empty");
+
+ if (!validateFilterClauses(metadata, index_clause.expressions))
+ throw new InvalidRequestException("No indexed columns present in index clause with operator EQ");
+ }
+
+ // return true if index_clause contains an indexed columns with operator EQ
+ public static boolean validateFilterClauses(CFMetaData metadata, List<IndexExpression> index_clause)
+ throws InvalidRequestException
+ {
+ if (isEmpty(index_clause))
+ // no filter to apply
+ return false;
+
Set<ByteBuffer> indexedColumns = Table.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.getIndexedColumns();
AbstractType nameValidator = ColumnFamily.getComparatorFor(metadata.ksName, metadata.cfName, null);
boolean isIndexed = false;
- for (IndexExpression expression : index_clause.expressions)
+ for (IndexExpression expression : index_clause)
{
try
{
@@ -553,8 +585,7 @@ public class ThriftValidation
isIndexed |= (expression.op == IndexOperator.EQ) && indexedColumns.contains(expression.column_name);
}
- if (!isIndexed)
- throw new InvalidRequestException("No indexed columns present in index clause with operator EQ");
+ return isIndexed;
}
public static void validateCfDef(CfDef cf_def, CFMetaData old) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/test/system/test_thrift_server.py
----------------------------------------------------------------------
diff --git a/test/system/test_thrift_server.py b/test/system/test_thrift_server.py
index ab8e1bd..fdc8736 100644
--- a/test/system/test_thrift_server.py
+++ b/test/system/test_thrift_server.py
@@ -213,8 +213,8 @@ def _expect_exception(fn, type_):
def _expect_missing(fn):
_expect_exception(fn, NotFoundException)
-def get_range_slice(client, parent, predicate, start, end, count, cl):
- kr = KeyRange(start, end, count=count)
+def get_range_slice(client, parent, predicate, start, end, count, cl, row_filter=None):
+ kr = KeyRange(start, end, count=count, row_filter=row_filter)
return client.get_range_slices(parent, predicate, kr, cl)
@@ -977,7 +977,7 @@ class TestMutations(ThriftTester):
client.insert(key, ColumnParent('Standard1'), Column(key, 'v', 0), ConsistencyLevel.ONE)
def check_slices_against_keys(keyList, sliceList):
- assert len(keyList) == len(sliceList)
+ assert len(keyList) == len(sliceList), "%d vs %d" % (len(keyList), len(sliceList))
for key, ks in zip(keyList, sliceList):
assert key == ks.key
@@ -1484,11 +1484,14 @@ class TestMutations(ThriftTester):
client.insert('key3', ColumnParent('ToBeIndexed'), Column('birthdate', _i64(3), 0), ConsistencyLevel.ONE)
client.insert('key3', ColumnParent('ToBeIndexed'), Column('b', _i64(3), 0), ConsistencyLevel.ONE)
- # Should fail without index
+ # First without index
cp = ColumnParent('ToBeIndexed')
sp = SlicePredicate(slice_range=SliceRange('', ''))
- clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
- _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+ clause = FilterClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))])
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
+ assert len(result) == 1, result
+ assert result[0].key == 'key1'
+ assert len(result[0].columns) == 1, result[0].columns
# add an index on 'birthdate'
ks1 = client.describe_keyspace('Keyspace1')
@@ -1507,11 +1510,8 @@ class TestMutations(ThriftTester):
# sleep a bit to give time for the index to build.
time.sleep(0.5)
- # simple query on one index expression
- cp = ColumnParent('ToBeIndexed')
- sp = SlicePredicate(slice_range=SliceRange('', ''))
- clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
- result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+ # repeat query on one index expression
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
assert len(result) == 1, result
assert result[0].key == 'key1'
assert len(result[0].columns) == 1, result[0].columns
@@ -1857,21 +1857,21 @@ class TestMutations(ThriftTester):
# simple query on one index expression
cp = ColumnParent('Indexed1')
sp = SlicePredicate(slice_range=SliceRange('', ''))
- clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
- result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+ clause = FilterClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))])
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
assert len(result) == 1, result
assert result[0].key == 'key1'
assert len(result[0].columns) == 1, result[0].columns
- # solo unindexed expression is invalid
- clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(1))], '')
- _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+ # without index
+ clause = FilterClause([IndexExpression('b', IndexOperator.EQ, _i64(1))])
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
+ assert len(result) == 0, result
# but unindexed expression added to indexed one is ok
- clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(3)),
- IndexExpression('birthdate', IndexOperator.EQ, _i64(3))],
- '')
- result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+ clause = FilterClause([IndexExpression('b', IndexOperator.EQ, _i64(3)),
+ IndexExpression('birthdate', IndexOperator.EQ, _i64(3))])
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
assert len(result) == 1, result
assert result[0].key == 'key3'
assert len(result[0].columns) == 2, result[0].columns
@@ -1885,20 +1885,20 @@ class TestMutations(ThriftTester):
client.insert('key1', ColumnParent('Indexed3'), Column(u, 'a', 0), ConsistencyLevel.ONE)
client.insert('key1', ColumnParent('Indexed3'), Column(u2, 'b', 0), ConsistencyLevel.ONE)
# name comparator + data validator of incompatible types -- see CASSANDRA-2347
- clause = IndexClause([IndexExpression(u, IndexOperator.EQ, 'a'),
- IndexExpression(u2, IndexOperator.EQ, 'b')], '')
- result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+ clause = FilterClause([IndexExpression(u, IndexOperator.EQ, 'a'),
+ IndexExpression(u2, IndexOperator.EQ, 'b')])
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
assert len(result) == 1, result
cp = ColumnParent('Indexed2') # timeuuid name, long values
# name must be valid (TimeUUID)
- clause = IndexClause([IndexExpression('foo', IndexOperator.EQ, uuid.UUID('00000000-0000-1000-0000-000000000000').bytes)], '')
- _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+ clause = FilterClause([IndexExpression('foo', IndexOperator.EQ, uuid.UUID('00000000-0000-1000-0000-000000000000').bytes)])
+ _expect_exception(lambda: get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause), InvalidRequestException)
# value must be valid (TimeUUID)
- clause = IndexClause([IndexExpression(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, IndexOperator.EQ, "foo")], '')
- _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
+ clause = FilterClause([IndexExpression(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, IndexOperator.EQ, "foo")])
+ _expect_exception(lambda: get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause), InvalidRequestException)
def test_index_scan_expiring(self):
""" Test that column ttled expires from KEYS index"""
@@ -1906,13 +1906,13 @@ class TestMutations(ThriftTester):
client.insert('key1', ColumnParent('Indexed1'), Column('birthdate', _i64(1), 0, 1), ConsistencyLevel.ONE)
cp = ColumnParent('Indexed1')
sp = SlicePredicate(slice_range=SliceRange('', ''))
- clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))], '')
+ clause = FilterClause([IndexExpression('birthdate', IndexOperator.EQ, _i64(1))])
# query before expiration
- result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
assert len(result) == 1, result
# wait for expiration and requery
time.sleep(2)
- result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+ result = get_range_slice(client, cp, sp, '', '', 100, ConsistencyLevel.ONE, clause)
assert len(result) == 0, result
def test_column_not_found_quorum(self):
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index c80486e..0248869 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -133,7 +133,8 @@ public class Util
return cfs.getRangeSlice(superColumn,
new Bounds<Token>(min, min).toRowBounds(),
10000,
- new IdentityQueryFilter());
+ new IdentityQueryFilter(),
+ null);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3297a96e/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 7edab90..b1b34ee 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -44,7 +44,6 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -81,7 +80,7 @@ public class CleanupTest extends CleanupHelper
// record max timestamps of the sstables pre-cleanup
List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
- rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ rows = Util.getRangeSlice(cfs);
assertEquals(LOOPS, rows.size());
// with one token in the ring, owned by the local node, cleanup should be a no-op
@@ -91,7 +90,7 @@ public class CleanupTest extends CleanupHelper
assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
// check data is still there
- rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ rows = Util.getRangeSlice(cfs);
assertEquals(LOOPS, rows.size());
}
@@ -106,7 +105,7 @@ public class CleanupTest extends CleanupHelper
// insert data and verify we get it back w/ range query
fillCF(cfs, LOOPS);
- rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ rows = Util.getRangeSlice(cfs);
assertEquals(LOOPS, rows.size());
SecondaryIndex index = cfs.indexManager.getIndexForColumn(COLUMN);
@@ -116,11 +115,11 @@ public class CleanupTest extends CleanupHelper
// verify we get it back w/ index query too
IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, VALUE);
- IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, Integer.MAX_VALUE);
+ List<IndexExpression> clause = Arrays.asList(expr);
IFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- rows = table.getColumnFamilyStore(CF1).search(clause, range, filter);
+ rows = table.getColumnFamilyStore(CF1).search(clause, range, Integer.MAX_VALUE, filter);
assertEquals(LOOPS, rows.size());
// we don't allow cleanup when the local host has no range to avoid wipping up all data when a node has not join the ring.
@@ -135,14 +134,14 @@ public class CleanupTest extends CleanupHelper
CompactionManager.instance.performCleanup(cfs, new NodeId.OneShotRenewer());
// row data should be gone
- rows = cfs.getRangeSlice(null, range, 1000, new IdentityQueryFilter());
+ rows = Util.getRangeSlice(cfs);
assertEquals(0, rows.size());
// not only should it be gone but there should be no data on disk, not even tombstones
assert cfs.getSSTables().isEmpty();
// 2ary indexes should result in no results, too (although tombstones won't be gone until compacted)
- rows = cfs.search(clause, range, filter);
+ rows = cfs.search(clause, range, Integer.MAX_VALUE, filter);
assertEquals(0, rows.size());
}