You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 10:32:41 UTC
[3/4] Add auto paging capability to the native protocol
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index c9f715c..c02da1d 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -45,18 +45,15 @@ public class RowIteratorFactory
* and filtered by the queryfilter.
* @param memtables Memtables pending flush.
* @param sstables SStables to scan through.
- * @param startWith Start at this key
- * @param stopAt Stop and this key
- * @param filter Used to decide which columns to pull out
+ * @param range The data range to fetch
* @param cfs
* @return A row iterator following all the given restrictions
*/
public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
final Collection<SSTableReader> sstables,
- final RowPosition startWith,
- final RowPosition stopAt,
- final QueryFilter filter,
- final ColumnFamilyStore cfs)
+ final DataRange range,
+ final ColumnFamilyStore cfs,
+ final long now)
{
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -64,19 +61,19 @@ public class RowIteratorFactory
// memtables
for (Memtable memtable : memtables)
{
- iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(filter, memtable.getEntryIterator(startWith, stopAt)));
+ iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
}
for (SSTableReader sstable : sstables)
{
- final SSTableScanner scanner = sstable.getScanner(filter, startWith);
+ final SSTableScanner scanner = sstable.getScanner(range);
iterators.add(scanner);
}
// reduce rows from all sources into a single row
return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
{
- private final int gcBefore = cfs.gcBefore(filter.timestamp);
+ private final int gcBefore = cfs.gcBefore(now);
private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
private DecoratedKey key;
private ColumnFamily returnCF;
@@ -96,17 +93,16 @@ public class RowIteratorFactory
protected Row getReduced()
{
-
// First check if this row is in the rowCache. If it is we can skip the rest
ColumnFamily cached = cfs.getRawCachedRow(key);
if (cached == null)
{
// not cached: collate
- filter.collateOnDiskAtom(returnCF, colIters, gcBefore);
+ QueryFilter.collateOnDiskAtom(returnCF, colIters, range.columnFilter(key.key), gcBefore, now);
}
else
{
- QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+ QueryFilter keyFilter = new QueryFilter(key, cfs.name, range.columnFilter(key.key), now);
returnCF = cfs.filterColumnFamily(cached, keyFilter);
}
@@ -123,12 +119,12 @@ public class RowIteratorFactory
*/
private static class ConvertToColumnIterator<T extends ColumnFamily> implements CloseableIterator<OnDiskAtomIterator>
{
- private final QueryFilter filter;
+ private final DataRange range;
private final Iterator<Map.Entry<DecoratedKey, T>> iter;
- public ConvertToColumnIterator(QueryFilter filter, Iterator<Map.Entry<DecoratedKey, T>> iter)
+ public ConvertToColumnIterator(DataRange range, Iterator<Map.Entry<DecoratedKey, T>> iter)
{
- this.filter = filter;
+ this.range = range;
this.iter = iter;
}
@@ -151,7 +147,7 @@ public class RowIteratorFactory
{
public OnDiskAtomIterator create()
{
- return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey());
+ return range.columnFilter(entry.getKey().key).getColumnFamilyIterator(entry.getKey(), entry.getValue());
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 5c42de5..508d1d2 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -106,6 +106,11 @@ public class SliceFromReadCommand extends ReadCommand
return filter;
}
+ public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
+ {
+ return new SliceFromReadCommand(table, key, cfName, timestamp, newFilter);
+ }
+
/**
* The original number of columns requested by the user.
* This can be different from count when the slice command is a retry (see
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
deleted file mode 100644
index c1933ad..0000000
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.filter.*;
-
-public class SliceQueryPager implements Iterator<ColumnFamily>
-{
- public static final int DEFAULT_PAGE_SIZE = 10000;
-
- public final ColumnFamilyStore cfs;
- public final DecoratedKey key;
-
- private ColumnSlice[] slices;
- private boolean exhausted;
-
- public SliceQueryPager(ColumnFamilyStore cfs, DecoratedKey key, ColumnSlice[] slices)
- {
- this.cfs = cfs;
- this.key = key;
- this.slices = slices;
- }
-
- // This will *not* do a query
- public boolean hasNext()
- {
- return !exhausted;
- }
-
- // This might return an empty column family (but never a null one)
- public ColumnFamily next()
- {
- if (exhausted)
- return null;
-
- long now = System.currentTimeMillis();
- SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
- QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
- ColumnFamily cf = cfs.getColumnFamily(filter);
- if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
- {
- exhausted = true;
- }
- else
- {
- Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
- Column lastColumn = iter.next();
- while (lastColumn.isMarkedForDelete(now))
- lastColumn = iter.next();
-
- int i = 0;
- for (; i < slices.length; ++i)
- {
- ColumnSlice current = slices[i];
- if (cfs.getComparator().compare(lastColumn.name(), current.finish) <= 0)
- break;
- }
- if (i >= slices.length)
- exhausted = true;
- else
- slices = Arrays.copyOfRange(slices, i, slices.length);
- }
- return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 90bfd8d..8528515 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -678,7 +678,8 @@ public class SystemTable
return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
null,
new IdentityQueryFilter(),
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
}
public static Collection<RowMutation> serializeSchema()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 772f842..409076f 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
import org.apache.cassandra.tracing.Tracing;
/**
@@ -50,6 +51,7 @@ import org.apache.cassandra.tracing.Tracing;
public class Table
{
public static final String SYSTEM_KS = "system";
+ private static final int DEFAULT_PAGE_SIZE = 10000;
private static final Logger logger = LoggerFactory.getLogger(Table.class);
@@ -398,7 +400,7 @@ public class Table
switchLock.readLock().lock();
try
{
- SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY);
+ Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
while (pager.hasNext())
{
ColumnFamily cf = pager.next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index a5d54ce..2fda715 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -61,6 +61,16 @@ public class ColumnCounter
return ignored;
}
+ public ColumnCounter countAll(ColumnFamily container)
+ {
+ if (container == null)
+ return this;
+
+ for (Column c : container)
+ count(c, container);
+ return this;
+ }
+
public static class GroupByPrefix extends ColumnCounter
{
private final CompositeType type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index af7431c..49503b5 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -89,6 +89,12 @@ public class ColumnSlice
return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
}
+ public boolean isBefore(Comparator<ByteBuffer> cmp, ByteBuffer name)
+ {
+ return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
+ }
+
+
@Override
public final int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index d061ac2..e0f3ec3 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.filter;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -32,6 +33,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,47 +47,36 @@ public abstract class ExtendedFilter
public final ColumnFamilyStore cfs;
public final long timestamp;
- protected final IDiskAtomFilter originalFilter;
+ public final DataRange dataRange;
private final int maxResults;
private final boolean countCQL3Rows;
- private final boolean isPaging;
+ private volatile int currentLimit;
public static ExtendedFilter create(ColumnFamilyStore cfs,
+ DataRange dataRange,
List<IndexExpression> clause,
- IDiskAtomFilter filter,
int maxResults,
- long timestamp,
boolean countCQL3Rows,
- boolean isPaging)
+ long timestamp)
{
if (clause == null || clause.isEmpty())
- {
- return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
- }
- else
- {
- if (isPaging)
- throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
- return cfs.getComparator() instanceof CompositeType
- ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
- : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
- }
+ return new EmptyClauseFilter(cfs, dataRange, maxResults, countCQL3Rows, timestamp);
+
+ return new WithClauses(cfs, dataRange, clause, maxResults, countCQL3Rows, timestamp);
}
- protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
+ protected ExtendedFilter(ColumnFamilyStore cfs, DataRange dataRange, int maxResults, boolean countCQL3Rows, long timestamp)
{
assert cfs != null;
- assert filter != null;
+ assert dataRange != null;
this.cfs = cfs;
- this.originalFilter = filter;
+ this.dataRange = dataRange;
this.maxResults = maxResults;
this.timestamp = timestamp;
this.countCQL3Rows = countCQL3Rows;
- this.isPaging = isPaging;
+ this.currentLimit = maxResults;
if (countCQL3Rows)
- originalFilter.updateColumnsLimit(maxResults);
- if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish().remaining() != 0))
- throw new IllegalArgumentException("Cross-row paging is only supported for SliceQueryFilter having an empty finish column");
+ dataRange.updateColumnsLimit(maxResults);
}
public int maxRows()
@@ -98,37 +89,30 @@ public abstract class ExtendedFilter
return countCQL3Rows ? maxResults : Integer.MAX_VALUE;
}
- /**
- * Update the filter if necessary given the number of column already
- * fetched.
- */
- public void updateFilter(int currentColumnsCount)
+ public int currentLimit()
{
- // As soon as we'd done our first call, we want to reset the start column if we're paging
- if (isPaging)
- ((SliceQueryFilter)initialFilter()).setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
- if (!countCQL3Rows)
- return;
+ return currentLimit;
+ }
- int remaining = maxResults - currentColumnsCount;
- initialFilter().updateColumnsLimit(remaining);
+ public IDiskAtomFilter columnFilter(ByteBuffer key)
+ {
+ return dataRange.columnFilter(key);
}
public int lastCounted(ColumnFamily data)
{
- if (initialFilter() instanceof SliceQueryFilter)
- return ((SliceQueryFilter)initialFilter()).lastCounted();
- else
- return initialFilter().getLiveCount(data, timestamp);
+ return dataRange.getLiveCount(data, timestamp);
}
- /** The initial filter we'll do our first slice with (either the original or a superset of it) */
- public abstract IDiskAtomFilter initialFilter();
-
- public IDiskAtomFilter originalFilter()
+ public void updateFilter(int currentColumnsCount)
{
- return originalFilter;
+ if (!countCQL3Rows)
+ return;
+
+ currentLimit = maxResults - currentColumnsCount;
+ // We propagate that limit to the underlying filter so each internal query don't
+ // fetch more than we needs it to.
+ dataRange.updateColumnsLimit(currentLimit);
}
public abstract List<IndexExpression> getClause();
@@ -138,18 +122,18 @@ public abstract class ExtendedFilter
* @param data the data retrieve by the initial filter
* @return a filter or null if there can't be any columns we missed with our initial filter (typically if it was a names query, or a slice of the entire row)
*/
- public abstract IDiskAtomFilter getExtraFilter(ColumnFamily data);
+ public abstract IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data);
/**
* @return data pruned down to the columns originally asked for
*/
- public abstract ColumnFamily prune(ColumnFamily data);
+ public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data);
/**
* @return true if the provided data satisfies all the expressions from
* the clause of this filter.
*/
- public abstract boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder);
+ public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder);
public static boolean satisfies(int comparison, IndexOperator op)
{
@@ -170,44 +154,54 @@ public abstract class ExtendedFilter
}
}
- private static class FilterWithClauses extends ExtendedFilter
+ public static class WithClauses extends ExtendedFilter
{
- protected final List<IndexExpression> clause;
- protected final IDiskAtomFilter initialFilter;
-
- public FilterWithClauses(ColumnFamilyStore cfs,
- List<IndexExpression> clause,
- IDiskAtomFilter filter,
- int maxResults,
- long timestamp,
- boolean countCQL3Rows)
+ private final List<IndexExpression> clause;
+ private final IDiskAtomFilter optimizedFilter;
+
+ public WithClauses(ColumnFamilyStore cfs,
+ DataRange range,
+ List<IndexExpression> clause,
+ int maxResults,
+ boolean countCQL3Rows,
+ long timestamp)
{
- super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
+ super(cfs, range, maxResults, countCQL3Rows, timestamp);
assert clause != null;
this.clause = clause;
- this.initialFilter = computeInitialFilter();
+ this.optimizedFilter = computeOptimizedFilter();
}
- /** Sets up the initial filter. */
- protected IDiskAtomFilter computeInitialFilter()
+ /*
+ * Potentially optimize the column filter if we have a change to make it catch all clauses
+ * right away.
+ */
+ private IDiskAtomFilter computeOptimizedFilter()
{
- if (originalFilter instanceof SliceQueryFilter)
+ /*
+ * We shouldn't do the "optimization" for composites as the index names are not valid column names
+ * (which the rest of the method assumes). Said optimization is not useful for composites anyway.
+ * We also don't want to do for paging ranges as the actual filter depends on the row key (it would
+ * probably be possible to make it work but we won't really use it so we don't bother).
+ */
+ if (cfs.getComparator() instanceof CompositeType || dataRange instanceof DataRange.Paging)
+ return null;
+
+ IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since not a paging range
+ if (filter instanceof SliceQueryFilter)
{
// if we have a high chance of getting all the columns in a single index slice (and it's not too costly), do that.
// otherwise, the extraFilter (lazily created) will fetch by name the columns referenced by the additional expressions.
if (cfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
{
logger.trace("Expanding slice filter to entire row to cover additional expressions");
- return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ((SliceQueryFilter) originalFilter).reversed,
- Integer.MAX_VALUE);
+ return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, ((SliceQueryFilter)filter).reversed, Integer.MAX_VALUE);
}
}
else
{
logger.trace("adding columns to original Filter to cover additional expressions");
- assert originalFilter instanceof NamesQueryFilter;
+ assert filter instanceof NamesQueryFilter;
if (!clause.isEmpty())
{
SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
@@ -215,16 +209,17 @@ public abstract class ExtendedFilter
{
columns.add(expr.column_name);
}
- columns.addAll(((NamesQueryFilter) originalFilter).columns);
- return ((NamesQueryFilter)originalFilter).withUpdatedColumns(columns);
+ columns.addAll(((NamesQueryFilter) filter).columns);
+ return ((NamesQueryFilter) filter).withUpdatedColumns(columns);
}
}
- return originalFilter;
+ return null;
}
- public IDiskAtomFilter initialFilter()
+ @Override
+ public IDiskAtomFilter columnFilter(ByteBuffer key)
{
- return initialFilter;
+ return optimizedFilter == null ? dataRange.columnFilter(key) : optimizedFilter;
}
public List<IndexExpression> getClause()
@@ -233,21 +228,13 @@ public abstract class ExtendedFilter
}
/*
- * We may need an extra query only if the original was a slice query (and thus may have miss the expression for the clause).
- * Even then, there is no point in doing an extra query if the original filter grabbed the whole row.
- * Lastly, we only need the extra query if we haven't yet got all the expressions from the clause.
+ * We may need an extra query only if the original query wasn't selecting the row entirely.
+ * Furthermore, we only need the extra query if we haven't yet got all the expressions from the clause.
*/
- private boolean needsExtraQuery(ColumnFamily data)
+ private boolean needsExtraQuery(ByteBuffer rowKey, ColumnFamily data)
{
- if (!(originalFilter instanceof SliceQueryFilter))
- return false;
-
- SliceQueryFilter filter = (SliceQueryFilter)originalFilter;
- // Check if we've fetch the whole row
- if (filter.slices.length == 1
- && filter.start().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- && filter.finish().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- && filter.count == Integer.MAX_VALUE)
+ IDiskAtomFilter filter = columnFilter(rowKey);
+ if (filter instanceof SliceQueryFilter && DataRange.isFullRowSlice((SliceQueryFilter)filter))
return false;
for (IndexExpression expr : clause)
@@ -261,9 +248,18 @@ public abstract class ExtendedFilter
return false;
}
- public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+ public IDiskAtomFilter getExtraFilter(DecoratedKey rowKey, ColumnFamily data)
{
- if (!needsExtraQuery(data))
+ /*
+ * This method assumes the IndexExpression names are valid column names, which is not the
+ * case with composites. This is ok for now however since:
+ * 1) CompositeSearcher doesn't use it.
+ * 2) We don't yet allow non-indexed range slice with filters in CQL3 (i.e. this will never be
+ * called by CFS.filter() for composites).
+ */
+ assert !(cfs.getComparator() instanceof CompositeType);
+
+ if (!needsExtraQuery(rowKey.key, data))
return null;
// Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
@@ -278,17 +274,19 @@ public abstract class ExtendedFilter
return new NamesQueryFilter(columns);
}
- public ColumnFamily prune(ColumnFamily data)
+ public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
{
- if (initialFilter == originalFilter)
+ if (optimizedFilter == null)
return data;
+
ColumnFamily pruned = data.cloneMeShallow();
- OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
- originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+ IDiskAtomFilter filter = dataRange.columnFilter(rowKey.key);
+ OnDiskAtomIterator iter = filter.getColumnFamilyIterator(rowKey, data);
+ filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
return pruned;
}
- public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+ public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
{
// We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
// where the index returned a row which doesn't have the primary column when we actually read it
@@ -310,7 +308,7 @@ public abstract class ExtendedFilter
}
else
{
- dataValue = extractDataValue(def, rowKey, data, builder);
+ dataValue = extractDataValue(def, rowKey.key, data, builder);
validator = def.getValidator();
}
@@ -346,68 +344,29 @@ public abstract class ExtendedFilter
}
}
- private static class FilterWithCompositeClauses extends FilterWithClauses
- {
- public FilterWithCompositeClauses(ColumnFamilyStore cfs,
- List<IndexExpression> clause,
- IDiskAtomFilter filter,
- int maxResults,
- long timestamp,
- boolean countCQL3Rows)
- {
- super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
- }
-
- /*
- * For composites, the index name is not a valid column name (it's only
- * one of the component), which means we should not do the
- * NamesQueryFilter part of FilterWithClauses in particular.
- * Besides, CompositesSearcher doesn't really use the initial filter
- * expect to know the limit set by the user, so create a fake filter
- * with only the count information.
- */
- protected IDiskAtomFilter computeInitialFilter()
- {
- int limit = originalFilter instanceof SliceQueryFilter
- ? ((SliceQueryFilter)originalFilter).count
- : Integer.MAX_VALUE;
- return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, limit);
- }
- }
-
private static class EmptyClauseFilter extends ExtendedFilter
{
- public EmptyClauseFilter(ColumnFamilyStore cfs,
- IDiskAtomFilter filter,
- int maxResults,
- long timestamp,
- boolean countCQL3Rows,
- boolean isPaging)
- {
- super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
- }
-
- public IDiskAtomFilter initialFilter()
+ public EmptyClauseFilter(ColumnFamilyStore cfs, DataRange range, int maxResults, boolean countCQL3Rows, long timestamp)
{
- return originalFilter;
+ super(cfs, range, maxResults, countCQL3Rows, timestamp);
}
public List<IndexExpression> getClause()
{
- throw new UnsupportedOperationException();
+ return Collections.<IndexExpression>emptyList();
}
- public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+ public IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data)
{
return null;
}
- public ColumnFamily prune(ColumnFamily data)
+ public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
{
return data;
}
- public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+ public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
{
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 35f71e5..69a8950 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -41,10 +41,10 @@ import org.apache.cassandra.io.util.FileDataInput;
public interface IDiskAtomFilter
{
/**
- * returns an iterator that returns columns from the given memtable
+ * returns an iterator that returns columns from the given columnFamily
* matching the Filter criteria in sorted order.
*/
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
+ public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf);
/**
* Get an iterator that returns columns from the given SSTable using the opened file
@@ -74,6 +74,7 @@ public interface IDiskAtomFilter
public void updateColumnsLimit(int newLimit);
public int getLiveCount(ColumnFamily cf, long now);
+ public ColumnCounter columnCounter(AbstractType<?> comparator, long now);
public IDiskAtomFilter cloneShallow();
public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 297f227..d3f057d 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -27,6 +27,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
@@ -75,9 +76,10 @@ public class NamesQueryFilter implements IDiskAtomFilter
return new NamesQueryFilter(newColumns, countCQL3Rows);
}
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+ public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf)
{
- return Memtable.getNamesIterator(key, cf, this);
+ assert cf != null;
+ return new ByNameColumnIterator(columns.iterator(), cf, key);
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -120,6 +122,8 @@ public class NamesQueryFilter implements IDiskAtomFilter
public int getLiveCount(ColumnFamily cf, long now)
{
+ // Note: we could use columnCounter() but we save the object allocation as it's simple enough
+
if (countCQL3Rows)
return cf.hasOnlyTombstones(now) ? 0 : 1;
@@ -147,6 +151,56 @@ public class NamesQueryFilter implements IDiskAtomFilter
return true;
}
+ public boolean countCQL3Rows()
+ {
+ return countCQL3Rows;
+ }
+
+ public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
+ {
+ return countCQL3Rows
+ ? new ColumnCounter.GroupByPrefix(now, null, 0)
+ : new ColumnCounter(now);
+ }
+
+ private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
+ {
+ private final ColumnFamily cf;
+ private final DecoratedKey key;
+ private final Iterator<ByteBuffer> iter;
+
+ public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
+ {
+ this.iter = iter;
+ this.cf = cf;
+ this.key = key;
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return cf;
+ }
+
+ public DecoratedKey getKey()
+ {
+ return key;
+ }
+
+ protected OnDiskAtom computeNext()
+ {
+ while (iter.hasNext())
+ {
+ ByteBuffer current = iter.next();
+ Column column = cf.getColumn(current);
+ if (column != null)
+ return column;
+ }
+ return endOfData();
+ }
+
+ public void close() throws IOException { }
+ }
+
public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
{
public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index ac0c632..4f71f3a 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -48,13 +48,13 @@ public class QueryFilter
ColumnFamily cf = memtable.getColumnFamily(key);
if (cf == null)
return null;
- return getMemtableColumnIterator(cf, key);
+ return getColumnFamilyIterator(cf);
}
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+ public OnDiskAtomIterator getColumnFamilyIterator(ColumnFamily cf)
{
assert cf != null;
- return filter.getMemtableColumnIterator(cf, key);
+ return filter.getColumnFamilyIterator(key, cf);
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable)
@@ -69,10 +69,15 @@ public class QueryFilter
public void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, final int gcBefore)
{
+ collateOnDiskAtom(returnCF, toCollate, filter, gcBefore, timestamp);
+ }
+
+ public static void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
+ {
List<Iterator<Column>> filteredIterators = new ArrayList<Iterator<Column>>(toCollate.size());
for (Iterator<? extends OnDiskAtom> iter : toCollate)
filteredIterators.add(gatherTombstones(returnCF, iter));
- collateColumns(returnCF, filteredIterators, gcBefore);
+ collateColumns(returnCF, filteredIterators, filter, gcBefore, timestamp);
}
/**
@@ -84,7 +89,12 @@ public class QueryFilter
filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
}
- public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
+ public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, int gcBefore)
+ {
+ collateColumns(returnCF, toCollate, filter, gcBefore, timestamp);
+ }
+
+ public static void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
{
final Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
// define a 'reduced' iterator that merges columns w/ the same name, which
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index b76ce04..9cbc49e 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
@@ -95,14 +96,74 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
}
+ public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator)
+ {
+ Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator;
+
+ List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>();
+ boolean pastNewStart = false;
+ for (int i = 0; i < slices.length; i++)
+ {
+ ColumnSlice slice = slices[i];
+
+ if (pastNewStart)
+ {
+ newSlices.add(slice);
+ continue;
+ }
+
+ if (slices[i].isBefore(cmp, newStart))
+ continue;
+
+ if (slice.includes(cmp, newStart))
+ newSlices.add(new ColumnSlice(newStart, slice.finish));
+ else
+ newSlices.add(slice);
+
+ pastNewStart = true;
+ }
+ return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
+ }
+
public SliceQueryFilter withUpdatedSlice(ByteBuffer start, ByteBuffer finish)
{
return new SliceQueryFilter(new ColumnSlice[]{ new ColumnSlice(start, finish) }, reversed, count, compositesToGroup);
}
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+ public OnDiskAtomIterator getColumnFamilyIterator(final DecoratedKey key, final ColumnFamily cf)
{
- return Memtable.getSliceIterator(key, cf, this);
+ assert cf != null;
+ final Iterator<Column> filteredIter = reversed ? cf.reverseIterator(slices) : cf.iterator(slices);
+
+ return new OnDiskAtomIterator()
+ {
+ public ColumnFamily getColumnFamily()
+ {
+ return cf;
+ }
+
+ public DecoratedKey getKey()
+ {
+ return key;
+ }
+
+ public boolean hasNext()
+ {
+ return filteredIter.hasNext();
+ }
+
+ public OnDiskAtom next()
+ {
+ return filteredIter.next();
+ }
+
+ public void close() throws IOException { }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -122,7 +183,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
{
- columnCounter = getColumnCounter(container, now);
+ columnCounter = columnCounter(container.getComparator(), now);
while (reducedColumns.hasNext())
{
@@ -144,15 +205,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
public int getLiveCount(ColumnFamily cf, long now)
{
- ColumnCounter counter = getColumnCounter(cf, now);
- for (Column column : cf)
- counter.count(column, cf);
- return counter.live();
+ return columnCounter(cf.getComparator(), now).countAll(cf).live();
}
- private ColumnCounter getColumnCounter(ColumnFamily container, long now)
+ public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
{
- AbstractType<?> comparator = container.getComparator();
if (compositesToGroup < 0)
return new ColumnCounter(now);
else if (compositesToGroup == 0)
@@ -163,7 +220,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
public void trim(ColumnFamily cf, int trimTo, long now)
{
- ColumnCounter counter = getColumnCounter(cf, now);
+ ColumnCounter counter = columnCounter(cf.getComparator(), now);
Collection<Column> columns = reversed
? cf.getReverseSortedColumns()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 17ac81f..a40f4bd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -514,14 +515,9 @@ public class SecondaryIndexManager
* @param columnFilter the column range to restrict to
* @return found indexed rows
*/
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter columnFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows)
+ public List<Row> search(ExtendedFilter filter)
{
- List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
+ List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(filter.getClause());
if (indexSearchers.isEmpty())
return Collections.emptyList();
@@ -530,7 +526,7 @@ public class SecondaryIndexManager
if (indexSearchers.size() > 1)
throw new RuntimeException("Unable to search across multiple secondary index types");
- return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
+ return indexSearchers.get(0).search(filter);
}
public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index ddd79dd..d28afc0 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -21,8 +21,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
@@ -39,12 +38,7 @@ public abstract class SecondaryIndexSearcher
this.baseCfs = indexManager.baseCfs;
}
- public abstract List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows);
+ public abstract List<Row> search(ExtendedFilter filter);
/**
* @return true this index is able to handle given clauses.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e8c0a09..f9b7b11 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -44,16 +44,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows)
+ public List<Row> search(ExtendedFilter filter)
{
- assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
- return baseCfs.filter(getIndexedIterator(range, filter), filter);
+ assert filter.getClause() != null && !filter.getClause().isEmpty();
+ return baseCfs.filter(getIndexedIterator(filter), filter);
}
private ByteBuffer makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
@@ -62,10 +56,11 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
ColumnNameBuilder builder;
- if (filter.originalFilter() instanceof SliceQueryFilter)
+ IDiskAtomFilter columnFilter = filter.columnFilter(key);
+ if (columnFilter instanceof SliceQueryFilter)
{
- SliceQueryFilter originalFilter = (SliceQueryFilter)filter.originalFilter();
- builder = index.makeIndexColumnNameBuilder(key, isStart ? originalFilter.start() : originalFilter.finish());
+ SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
+ builder = index.makeIndexColumnNameBuilder(key, isStart ? sqf.start() : sqf.finish());
}
else
{
@@ -74,7 +69,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return isStart ? builder.build() : builder.buildAsEndOfRange();
}
- private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
@@ -93,6 +88,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
* possible key having a given token. A fix would be to actually store the token along the key in the
* indexed row.
*/
+ final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -140,12 +136,12 @@ public class CompositesSearcher extends SecondaryIndexSearcher
DecoratedKey currentKey = null;
ColumnFamily data = null;
int columnsCount = 0;
- int limit = ((SliceQueryFilter)filter.initialFilter()).count;
+ int limit = filter.currentLimit();
while (true)
{
// Did we got more columns that needed to respect the user limit?
- // (but we still need to return was fetch already)
+ // (but we still need to return what was fetch already)
if (columnsCount > limit)
return makeReturn(currentKey, data);
@@ -235,7 +231,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
// Check if this entry cannot be a hit due to the original column filter
ByteBuffer start = entry.indexedEntryStart();
- if (!filter.originalFilter().maySelectPrefix(baseComparator, start))
+ if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
continue;
logger.trace("Adding index hit to current row for {}", indexComparator.getString(column.name()));
@@ -256,7 +252,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
assert newData != null : "An entry with not data should have been considered stale";
- if (!filter.isSatisfiedBy(dk.key, newData, entry.indexedEntryNameBuilder))
+ if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder))
continue;
if (data == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index e919d8a..205efb7 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -47,20 +47,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows)
+ public List<Row> search(ExtendedFilter filter)
{
- assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
- return baseCfs.filter(getIndexedIterator(range, filter), filter);
+ assert filter.getClause() != null && !filter.getClause().isEmpty();
+ return baseCfs.filter(getIndexedIterator(filter), filter);
}
- private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
{
+
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
// TODO: allow merge join instead of just one index + loop
@@ -79,6 +74,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
* possible key having a given token. A fix would be to actually store the token along the key in the
* indexed row.
*/
+ final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -165,14 +161,14 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
logger.trace("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey), filter.timestamp));
// While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
if (data == null)
data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
// as in CFS.filter - extend the filter to ensure we include the columns
// from the index expressions, just in case they weren't included in the initialFilter
- IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
+ IDiskAtomFilter extraFilter = filter.getExtraFilter(dk, data);
if (extraFilter != null)
{
ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 85f2677..6c10d95 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
@@ -994,17 +995,12 @@ public class SSTableReader extends SSTable
/**
*
- * @param filter filter to use when reading the columns
+ * @param dataRange filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public SSTableScanner getScanner(QueryFilter filter)
+ public SSTableScanner getScanner(DataRange dataRange)
{
- return new SSTableScanner(this, filter, null);
- }
-
- public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
- {
- return new SSTableScanner(this, filter, startWith, null);
+ return new SSTableScanner(this, dataRange, null);
}
/**
@@ -1018,7 +1014,7 @@ public class SSTableReader extends SSTable
public SSTableScanner getScanner(RateLimiter limiter)
{
- return new SSTableScanner(this, null, limiter);
+ return new SSTableScanner(this, DataRange.allData(partitioner), limiter);
}
/**
@@ -1034,7 +1030,7 @@ public class SSTableReader extends SSTable
Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
if (rangeIterator.hasNext())
- return new SSTableScanner(this, null, range, limiter);
+ return new SSTableScanner(this, DataRange.forKeyRange(range), limiter);
else
return new EmptyCompactionScanner(getFilename());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index fb52a02..66e7189 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -21,9 +21,9 @@ import java.io.IOException;
import java.util.Iterator;
import com.google.common.util.concurrent.RateLimiter;
-
import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowPosition;
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.FileUtils;
@@ -43,43 +44,46 @@ public class SSTableScanner implements ICompactionScanner
protected final RandomAccessReader dfile;
protected final RandomAccessReader ifile;
public final SSTableReader sstable;
+ private final DataRange dataRange;
+ private final long stopAt;
+
protected Iterator<OnDiskAtomIterator> iterator;
- private final QueryFilter filter;
- private long stopAt;
/**
* @param sstable SSTable to scan; must not be null
- * @param filter filter to use when scanning the columns; may be null
+ * @param filter range of data to fetch; must not be null
* @param limiter background i/o RateLimiter; may be null
*/
- SSTableScanner(SSTableReader sstable, QueryFilter filter, RateLimiter limiter)
+ SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
{
assert sstable != null;
this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
this.sstable = sstable;
- this.filter = filter;
- stopAt = dfile.length();
+ this.dataRange = dataRange;
+ this.stopAt = computeStopAt();
+ seekToStart();
}
- public SSTableScanner(SSTableReader sstable, QueryFilter filter, RowPosition startWith, RateLimiter limiter)
+ private void seekToStart()
{
- this(sstable, filter, limiter);
+ if (dataRange.startKey().isMinimum(sstable.partitioner))
+ return;
- long indexPosition = sstable.getIndexScanPosition(startWith);
+ long indexPosition = sstable.getIndexScanPosition(dataRange.startKey());
// -1 means the key is before everything in the sstable. So just start from the beginning.
if (indexPosition == -1)
- indexPosition = 0;
- ifile.seek(indexPosition);
+ return;
+ ifile.seek(indexPosition);
try
{
while (!ifile.isEOF())
{
indexPosition = ifile.getFilePointer();
DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- int comparison = indexDecoratedKey.compareTo(startWith);
+ int comparison = indexDecoratedKey.compareTo(dataRange.startKey());
if (comparison >= 0)
{
// Found, just read the dataPosition and seek into index and data files
@@ -102,19 +106,14 @@ public class SSTableScanner implements ICompactionScanner
}
- public SSTableScanner(SSTableReader sstable, QueryFilter filter, Range<Token> range, RateLimiter limiter)
+ private long computeStopAt()
{
- this(sstable, filter, range.toRowBounds().left, limiter);
+ AbstractBounds<RowPosition> keyRange = dataRange.keyRange();
+ if (dataRange.stopKey().isMinimum(sstable.partitioner) || (keyRange instanceof Range && ((Range)keyRange).isWrapAround()))
+ return dfile.length();
- if (range.isWrapAround())
- {
- stopAt = dfile.length();
- }
- else
- {
- RowIndexEntry position = sstable.getPosition(range.toRowBounds().right, SSTableReader.Operator.GT);
- stopAt = position == null ? dfile.length() : position.position;
- }
+ RowIndexEntry position = sstable.getPosition(keyRange.toRowBounds().right, SSTableReader.Operator.GT);
+ return position == null ? dfile.length() : position.position;
}
public void close() throws IOException
@@ -202,8 +201,7 @@ public class SSTableScanner implements ICompactionScanner
}
assert !dfile.isEOF();
-
- if (filter == null)
+ if (dataRange.selectsFullRowFor(currentKey.key))
{
dfile.seek(currentEntry.position);
ByteBufferUtil.readWithShortLength(dfile); // key
@@ -217,7 +215,7 @@ public class SSTableScanner implements ICompactionScanner
{
public OnDiskAtomIterator create()
{
- return filter.getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+ return dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 43e2cb2..4ac408e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -116,6 +116,7 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PREPARE,
PAXOS_PROPOSE,
PAXOS_COMMIT,
+ PAGED_RANGE,
// remember to add new verbs at the end, since we serialize by ordinal
UNUSED_1,
UNUSED_2,
@@ -136,6 +137,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.READ, Stage.READ);
put(Verb.RANGE_SLICE, Stage.READ);
put(Verb.INDEX_SCAN, Stage.READ);
+ put(Verb.PAGED_RANGE, Stage.READ);
put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
@@ -188,6 +190,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.READ_REPAIR, RowMutation.serializer);
put(Verb.READ, ReadCommand.serializer);
put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
+ put(Verb.PAGED_RANGE, RangeSliceCommand.serializer);
put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
put(Verb.TREE_RESPONSE, ActiveRepairService.Validator.serializer);
@@ -216,6 +219,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
+ put(Verb.PAGED_RANGE, RangeSliceReply.serializer);
put(Verb.READ, ReadResponse.serializer);
put(Verb.TRUNCATE, TruncateResponse.serializer);
put(Verb.SNAPSHOT, null);
@@ -293,6 +297,7 @@ public final class MessagingService implements MessagingServiceMBean
Verb.READ_REPAIR,
Verb.READ,
Verb.RANGE_SLICE,
+ Verb.PAGED_RANGE,
Verb.REQUEST_RESPONSE);
// total dropped message counts for server lifetime
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 6483d9b..38d103d 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -17,9 +17,17 @@
*/
package org.apache.cassandra.service;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.util.UUID;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -30,6 +38,7 @@ public class QueryState
private final ClientState clientState;
private volatile long clock;
private volatile UUID preparedTracingSession;
+ private volatile Pager pager;
public QueryState(ClientState clientState)
{
@@ -68,6 +77,13 @@ public class QueryState
this.preparedTracingSession = sessionId;
}
+ public UUID getAndResetCurrentTracingSession()
+ {
+ UUID previous = preparedTracingSession;
+ preparedTracingSession = null;
+ return previous;
+ }
+
public void createTracingSession()
{
if (this.preparedTracingSession == null)
@@ -77,9 +93,53 @@ public class QueryState
else
{
UUID session = this.preparedTracingSession;
- this.preparedTracingSession = null;
Tracing.instance.newSession(session);
}
}
-}
+ public void attachPager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+ {
+ pager = new Pager(queryPager, statement, variables);
+ }
+
+ public boolean hasPager()
+ {
+ return pager != null;
+ }
+
+ public void dropPager()
+ {
+ pager = null;
+ }
+
+ public ResultMessage.Rows getNextPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ assert pager != null; // We've already validated (in ServerConnection) that this should not be null
+
+ int currentLimit = pager.queryPager.maxRemaining();
+ List<Row> page = pager.queryPager.fetchPage(pageSize);
+ ResultMessage.Rows msg = pager.statement.processResults(page, pager.variables, currentLimit, pager.queryPager.timestamp());
+
+ if (pager.queryPager.isExhausted())
+ dropPager();
+ else
+ msg.result.metadata.setHasMorePages();
+
+ return msg;
+ }
+
+ // Groups the actual query pager with the Select Query
+ private static class Pager
+ {
+ private final QueryPager queryPager;
+ private final SelectStatement statement;
+ private final List<ByteBuffer> variables;
+
+ private Pager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+ {
+ this.queryPager = queryPager;
+ this.statement = statement;
+ this.variables = variables;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index f63fcb1..9a7d1d2 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -19,39 +19,16 @@ package org.apache.cassandra.service;
import java.util.List;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RangeSliceCommand;
+import org.apache.cassandra.db.AbstractRangeCommand;
import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tracing.Tracing;
-public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
+public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand>
{
- public static List<Row> executeLocally(RangeSliceCommand command)
- {
- ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- if (cfs.indexManager.hasIndexFor(command.row_filter))
- return cfs.search(command.range,
- command.row_filter,
- command.predicate,
- command.maxResults,
- command.timestamp,
- command.countCQL3Rows);
- else
- return cfs.getRangeSlice(command.range,
- command.row_filter,
- command.predicate,
- command.maxResults,
- command.timestamp,
- command.countCQL3Rows,
- command.isPaging);
- }
-
- public void doVerb(MessageIn<RangeSliceCommand> message, int id)
+ public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
{
try
{
@@ -60,7 +37,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
/* Don't service reads! */
throw new RuntimeException("Cannot service reads while bootstrapping!");
}
- RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload));
+ RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
Tracing.trace("Enqueuing response to {}", message.from);
MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 612f89b..0286bd3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1294,11 +1294,11 @@ public class StorageProxy implements StorageProxyMBean
static class LocalRangeSliceRunnable extends DroppableRunnable
{
- private final RangeSliceCommand command;
+ private final AbstractRangeCommand command;
private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
private final long start = System.nanoTime();
- LocalRangeSliceRunnable(RangeSliceCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+ LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
{
super(MessagingService.Verb.READ);
this.command = command;
@@ -1307,7 +1307,7 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
+ RangeSliceReply result = new RangeSliceReply(command.executeLocally());
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
handler.response(result);
}
@@ -1337,7 +1337,7 @@ public class StorageProxy implements StorageProxyMBean
return inter;
}
- public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
+ public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
throws UnavailableException, ReadTimeoutException
{
Tracing.trace("Determining replicas to query");
@@ -1348,11 +1348,9 @@ public class StorageProxy implements StorageProxyMBean
// now scan until we have enough results
try
{
- IDiskAtomFilter commandPredicate = command.predicate;
-
int cql3RowCount = 0;
rows = new ArrayList<Row>();
- List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
+ List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.keyRange);
int i = 0;
AbstractBounds<RowPosition> nextRange = null;
List<InetAddress> nextEndpoints = null;
@@ -1408,15 +1406,7 @@ public class StorageProxy implements StorageProxyMBean
++i;
}
- RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
- command.column_family,
- command.timestamp,
- commandPredicate,
- range,
- command.row_filter,
- command.maxResults,
- command.countCQL3Rows,
- command.isPaging);
+ AbstractRangeCommand nodeCmd = command.forSubRange(range);
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
@@ -1431,7 +1421,7 @@ public class StorageProxy implements StorageProxyMBean
}
else
{
- MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
+ MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
Tracing.trace("Enqueuing request to {}", endpoint);
@@ -1444,8 +1434,8 @@ public class StorageProxy implements StorageProxyMBean
for (Row row : handler.get())
{
rows.add(row);
- if (nodeCmd.countCQL3Rows)
- cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
+ if (nodeCmd.countCQL3Rows())
+ cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
@@ -1462,18 +1452,9 @@ public class StorageProxy implements StorageProxyMBean
}
// if we're done, great, otherwise, move to the next range
- int count = nodeCmd.countCQL3Rows ? cql3RowCount : rows.size();
- if (count >= nodeCmd.maxResults)
+ int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
+ if (count >= nodeCmd.limit())
break;
-
- // if we are paging and already got some rows, reset the column filter predicate,
- // so we start iterating the next row from the first column
- if (!rows.isEmpty() && command.isPaging)
- {
- // We only allow paging with a slice filter (doesn't make sense otherwise anyway)
- assert commandPredicate instanceof SliceQueryFilter;
- commandPredicate = ((SliceQueryFilter)commandPredicate).withUpdatedSlices(ColumnSlice.ALL_COLUMNS_ARRAY);
- }
}
}
finally
@@ -1483,13 +1464,13 @@ public class StorageProxy implements StorageProxyMBean
return trim(command, rows);
}
- private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
+ private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
{
- // When countCQL3Rows, we let the caller trim the result.
- if (command.countCQL3Rows)
+ // When maxIsColumns, we let the caller trim the result.
+ if (command.countCQL3Rows())
return rows;
else
- return rows.size() > command.maxResults ? rows.subList(0, command.maxResults) : rows;
+ return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;
}
/**