You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/11/21 14:04:28 UTC
[2/2] git commit: Fix CQL3 limit
Fix CQL3 limit
patch by slebresne; reviewed by jbellis for CASSANDRA-4877
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a32eb9f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a32eb9f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a32eb9f7
Branch: refs/heads/cassandra-1.2
Commit: a32eb9f7d2f2868e8154d178e96e045859e1d855
Parents: e39bf7a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Nov 21 14:03:32 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Nov 21 14:03:32 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/cql3/statements/SelectStatement.java | 11 ++---
.../cassandra/db/AbstractColumnContainer.java | 9 +--
.../apache/cassandra/db/CollationController.java | 5 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 8 ++--
.../org/apache/cassandra/db/RangeSliceCommand.java | 20 ++++----
src/java/org/apache/cassandra/db/Row.java | 11 ++--
.../org/apache/cassandra/db/SliceQueryPager.java | 5 +-
.../apache/cassandra/db/filter/ExtendedFilter.java | 38 +++++++-------
.../cassandra/db/filter/IDiskAtomFilter.java | 12 +++--
.../cassandra/db/filter/NamesQueryFilter.java | 41 ++++++++++++++-
.../cassandra/db/filter/SliceQueryFilter.java | 12 ++++-
.../cassandra/db/index/SecondaryIndexManager.java | 4 +-
.../cassandra/db/index/SecondaryIndexSearcher.java | 2 +-
.../db/index/composites/CompositesSearcher.java | 8 ++--
.../cassandra/db/index/keys/KeysSearcher.java | 4 +-
.../cassandra/service/RangeSliceVerbHandler.java | 4 +-
.../cassandra/service/RowRepairResolver.java | 6 +-
.../org/apache/cassandra/service/StorageProxy.java | 24 +++++----
.../apache/cassandra/db/ColumnFamilyStoreTest.java | 2 +-
20 files changed, 139 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cdd651a..60e4c94 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
* Skip repair on system_trace and keyspaces with RF=1 (CASSANDRA-4956)
* Remove select arbitrary limit (CASSANDRA-4918)
* Correctly handle prepared operation on collections (CASSANDRA-4945)
+ * Fix CQL3 LIMIT (CASSANDRA-4877)
Merged from 1.1:
* add basic authentication support for Pig CassandraStorage (CASSANDRA-3042)
* fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5963e0e..4ae2b55 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -236,10 +236,7 @@ public class SelectStatement implements CQLStatement
IDiskAtomFilter filter = makeFilter(variables);
List<IndexExpression> expressions = getIndexExpressions(variables);
// The LIMIT provided by the user is the number of CQL row he wants returned.
- // For NamesQueryFilter, this is the number of internal rows returned, since a NamesQueryFilter can only select one CQL row in a given internal row.
- // For SliceQueryFilter however, we want to have getRangeSlice to count the number of columns, not the number of keys. Then
- // SliceQueryFilter.collectReducedColumns will correctly columns having the same composite prefix using ColumnCounter.
- boolean maxIsColumns = filter instanceof SliceQueryFilter;
+ // We want to have getRangeSlice to count the number of columns, not the number of keys.
return new RangeSliceCommand(keyspace(),
columnFamily(),
null,
@@ -247,7 +244,7 @@ public class SelectStatement implements CQLStatement
getKeyBounds(variables),
expressions,
getLimit(),
- maxIsColumns,
+ true,
false);
}
@@ -320,7 +317,7 @@ public class SelectStatement implements CQLStatement
{
SortedSet<ByteBuffer> columnNames = getRequestedColumns(variables);
QueryProcessor.validateColumnNames(columnNames);
- return new NamesQueryFilter(columnNames);
+ return new NamesQueryFilter(columnNames, true);
}
}
@@ -813,7 +810,7 @@ public class SelectStatement implements CQLStatement
}
else
{
- if (row.cf.getLiveColumnCount() == 0)
+ if (row.cf.hasOnlyTombstones())
continue;
// Static case: One cqlRow for all columns
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
index ab93c54..09d0a38 100644
--- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
+++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -164,17 +164,14 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
return getColumnCount();
}
- public int getLiveColumnCount()
+ public boolean hasOnlyTombstones()
{
- int count = 0;
-
for (IColumn column : columns)
{
if (column.isLive())
- count++;
+ return false;
}
-
- return count;
+ return true;
}
public Iterator<IColumn> iterator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 73c675f..7160b62 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -99,8 +99,9 @@ public class CollationController
// avoid changing the filter columns of the original filter
// (reduceNameFilter removes columns that are known to be irrelevant)
- TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(((NamesQueryFilter) filter.filter).columns);
- QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, new NamesQueryFilter(filterColumns));
+ NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
+ TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
+ QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, namesFilter.withUpdatedColumns(filterColumns));
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTable.maxTimestampComparator);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 439ef5f..7e4355a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1454,9 +1454,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return getRangeSlice(superColumn, range, maxResults, columnFilter, rowFilter, false, false);
}
- public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean maxIsColumns, boolean isPaging)
+ public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging)
{
- return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, maxIsColumns, isPaging));
+ return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging));
}
public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter)
@@ -1464,10 +1464,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return search(clause, range, maxResults, dataFilter, false);
}
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns)
+ public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
{
Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
- return indexManager.search(clause, range, maxResults, dataFilter, maxIsColumns);
+ return indexManager.search(clause, range, maxResults, dataFilter, countCQL3Rows);
}
public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index b8fdfd6..1748abd 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -83,7 +83,7 @@ public class RangeSliceCommand implements IReadCommand
public final AbstractBounds<RowPosition> range;
public final int maxResults;
- public final boolean maxIsColumns;
+ public final boolean countCQL3Rows;
public final boolean isPaging;
public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
@@ -101,7 +101,7 @@ public class RangeSliceCommand implements IReadCommand
this(keyspace, column_family, super_column, predicate, range, row_filter, maxResults, false, false);
}
- public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean maxIsColumns, boolean isPaging)
+ public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
{
this.keyspace = keyspace;
this.column_family = column_family;
@@ -110,7 +110,7 @@ public class RangeSliceCommand implements IReadCommand
this.range = range;
this.row_filter = row_filter;
this.maxResults = maxResults;
- this.maxIsColumns = maxIsColumns;
+ this.countCQL3Rows = countCQL3Rows;
this.isPaging = isPaging;
}
@@ -130,7 +130,7 @@ public class RangeSliceCommand implements IReadCommand
", range=" + range +
", row_filter =" + row_filter +
", maxResults=" + maxResults +
- ", maxIsColumns=" + maxIsColumns +
+ ", countCQL3Rows=" + countCQL3Rows +
'}';
}
@@ -143,7 +143,7 @@ public class RangeSliceCommand implements IReadCommand
public IndexScanCommand toIndexScanCommand()
{
assert row_filter != null && !row_filter.isEmpty();
- if (maxIsColumns || isPaging)
+ if (countCQL3Rows || isPaging)
throw new IllegalStateException("Cannot proceed with range query as the remote end has a version < 1.1. Please update the full cluster first.");
CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, column_family);
@@ -240,7 +240,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
dos.writeInt(sliceCommand.maxResults);
if (version >= MessagingService.VERSION_11)
{
- dos.writeBoolean(sliceCommand.maxIsColumns);
+ dos.writeBoolean(sliceCommand.countCQL3Rows);
dos.writeBoolean(sliceCommand.isPaging);
}
}
@@ -297,14 +297,14 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(dis, version).toRowBounds();
int maxResults = dis.readInt();
- boolean maxIsColumns = false;
+ boolean countCQL3Rows = false;
boolean isPaging = false;
if (version >= MessagingService.VERSION_11)
{
- maxIsColumns = dis.readBoolean();
+ countCQL3Rows = dis.readBoolean();
isPaging = dis.readBoolean();
}
- return new RangeSliceCommand(keyspace, columnFamily, superColumn, predicate, range, rowFilter, maxResults, maxIsColumns, isPaging);
+ return new RangeSliceCommand(keyspace, columnFamily, superColumn, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
}
public long serializedSize(RangeSliceCommand rsc, int version)
@@ -380,7 +380,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
size += TypeSizes.NATIVE.sizeof(rsc.maxResults);
if (version >= MessagingService.VERSION_11)
{
- size += TypeSizes.NATIVE.sizeof(rsc.maxIsColumns);
+ size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows);
size += TypeSizes.NATIVE.sizeof(rsc.isPaging);
}
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 0c129e2..74cd906 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
import java.io.*;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.StorageService;
@@ -39,11 +40,6 @@ public class Row
this.cf = cf;
}
- public int getLiveColumnCount()
- {
- return cf == null ? 0 : cf.getLiveColumnCount();
- }
-
@Override
public String toString()
{
@@ -53,6 +49,11 @@ public class Row
')';
}
+ public int getLiveCount(IDiskAtomFilter filter)
+ {
+ return cf == null ? 0 : filter.getLiveCount(cf);
+ }
+
public static class RowSerializer implements IVersionedSerializer<Row>
{
public void serialize(Row row, DataOutput dos, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
index b67c071..0f45f8c 100644
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -52,9 +52,10 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
return null;
QueryPath path = new QueryPath(cfs.getColumnFamilyName());
- QueryFilter filter = new QueryFilter(key, path, new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE));
+ SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
+ QueryFilter filter = new QueryFilter(key, path, sliceFilter);
ColumnFamily cf = cfs.getColumnFamily(filter);
- if (cf == null || cf.getLiveColumnCount() < DEFAULT_PAGE_SIZE)
+ if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE)
{
exhausted = true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 65c4563..4772c53 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -44,35 +44,35 @@ public abstract class ExtendedFilter
public final ColumnFamilyStore cfs;
protected final IDiskAtomFilter originalFilter;
private final int maxResults;
- private final boolean maxIsColumns;
+ private final boolean countCQL3Rows;
private final boolean isPaging;
- public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns, boolean isPaging)
+ public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows, boolean isPaging)
{
if (clause == null || clause.isEmpty())
{
- return new EmptyClauseFilter(cfs, filter, maxResults, maxIsColumns, isPaging);
+ return new EmptyClauseFilter(cfs, filter, maxResults, countCQL3Rows, isPaging);
}
else
{
if (isPaging)
throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
return cfs.getComparator() instanceof CompositeType
- ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, maxIsColumns)
- : new FilterWithClauses(cfs, filter, clause, maxResults, maxIsColumns);
+ ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, countCQL3Rows)
+ : new FilterWithClauses(cfs, filter, clause, maxResults, countCQL3Rows);
}
}
- protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean maxIsColumns, boolean isPaging)
+ protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
{
assert cfs != null;
assert filter != null;
this.cfs = cfs;
this.originalFilter = filter;
this.maxResults = maxResults;
- this.maxIsColumns = maxIsColumns;
+ this.countCQL3Rows = countCQL3Rows;
this.isPaging = isPaging;
- if (maxIsColumns)
+ if (countCQL3Rows)
originalFilter.updateColumnsLimit(maxResults);
if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish().remaining() != 0))
throw new IllegalArgumentException("Cross-row paging is only supported for SliceQueryFilter having an empty finish column");
@@ -80,12 +80,12 @@ public abstract class ExtendedFilter
public int maxRows()
{
- return maxIsColumns ? Integer.MAX_VALUE : maxResults;
+ return countCQL3Rows ? Integer.MAX_VALUE : maxResults;
}
public int maxColumns()
{
- return maxIsColumns ? maxResults : Integer.MAX_VALUE;
+ return countCQL3Rows ? maxResults : Integer.MAX_VALUE;
}
/**
@@ -98,7 +98,7 @@ public abstract class ExtendedFilter
if (isPaging)
((SliceQueryFilter)initialFilter()).setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER);
- if (!maxIsColumns)
+ if (!countCQL3Rows)
return;
int remaining = maxResults - currentColumnsCount;
@@ -110,7 +110,7 @@ public abstract class ExtendedFilter
if (initialFilter() instanceof SliceQueryFilter)
return ((SliceQueryFilter)initialFilter()).lastCounted();
else
- return data.getLiveColumnCount();
+ return initialFilter().getLiveCount(data);
}
/** The initial filter we'll do our first slice with (either the original or a superset of it) */
@@ -165,9 +165,9 @@ public abstract class ExtendedFilter
protected final List<IndexExpression> clause;
protected final IDiskAtomFilter initialFilter;
- public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns)
+ public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
{
- super(cfs, filter, maxResults, maxIsColumns, false);
+ super(cfs, filter, maxResults, countCQL3Rows, false);
assert clause != null;
this.clause = clause;
this.initialFilter = computeInitialFilter();
@@ -201,7 +201,7 @@ public abstract class ExtendedFilter
columns.add(expr.column_name);
}
columns.addAll(((NamesQueryFilter) originalFilter).columns);
- return new NamesQueryFilter(columns);
+ return ((NamesQueryFilter)originalFilter).withUpdatedColumns(columns);
}
}
return originalFilter;
@@ -294,9 +294,9 @@ public abstract class ExtendedFilter
private static class FilterWithCompositeClauses extends FilterWithClauses
{
- public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns)
+ public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
{
- super(cfs, filter, clause, maxResults, maxIsColumns);
+ super(cfs, filter, clause, maxResults, countCQL3Rows);
}
/*
@@ -318,9 +318,9 @@ public abstract class ExtendedFilter
private static class EmptyClauseFilter extends ExtendedFilter
{
- public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean maxIsColumns, boolean isPaging)
+ public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
{
- super(cfs, filter, maxResults, maxIsColumns, isPaging);
+ super(cfs, filter, maxResults, countCQL3Rows, isPaging);
}
public IDiskAtomFilter initialFilter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 9805659..f1d9611 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -44,7 +44,7 @@ public interface IDiskAtomFilter
* returns an iterator that returns columns from the given memtable
* matching the Filter criteria in sorted order.
*/
- public abstract OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
+ public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
/**
* Get an iterator that returns columns from the given SSTable using the opened file
@@ -53,32 +53,34 @@ public interface IDiskAtomFilter
* @param file Already opened file data input, saves us opening another one
* @param key The key of the row we are about to iterate over
*/
- public abstract ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry);
+ public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry);
/**
* returns an iterator that returns columns from the given SSTable
* matching the Filter criteria in sorted order.
*/
- public abstract ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key);
+ public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key);
/**
* collects columns from reducedColumns into returnCF. Termination is determined
* by the filter code, which should have some limit on the number of columns
* to avoid running out of memory on large rows.
*/
- public abstract void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore);
+ public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore);
/**
* subcolumns of a supercolumn are unindexed, so to pick out parts of those we operate in-memory.
* @param superColumn may be modified by filtering op.
*/
- public abstract SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore);
+ public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore);
public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator);
public boolean isReversed();
public void updateColumnsLimit(int newLimit);
+ public int getLiveCount(ColumnFamily cf);
+
public static class Serializer implements IVersionedSerializer<IDiskAtomFilter>
{
public static Serializer instance = new Serializer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index a347926..0581e12 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -45,9 +46,19 @@ public class NamesQueryFilter implements IDiskAtomFilter
public final SortedSet<ByteBuffer> columns;
+ // If true, getLiveCount will always return either 0 or 1. This uses the fact that we know
+ // CQL3 will never use a name filter with cell names spanning multiple CQL3 rows.
+ private final boolean countCQL3Rows;
+
public NamesQueryFilter(SortedSet<ByteBuffer> columns)
{
+ this(columns, false);
+ }
+
+ public NamesQueryFilter(SortedSet<ByteBuffer> columns, boolean countCQL3Rows)
+ {
this.columns = columns;
+ this.countCQL3Rows = countCQL3Rows;
}
public NamesQueryFilter(ByteBuffer column)
@@ -55,6 +66,11 @@ public class NamesQueryFilter implements IDiskAtomFilter
this(FBUtilities.singleton(column));
}
+ public NamesQueryFilter withUpdatedColumns(SortedSet<ByteBuffer> newColumns)
+ {
+ return new NamesQueryFilter(newColumns, countCQL3Rows);
+ }
+
public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
{
return Memtable.getNamesIterator(key, cf, this);
@@ -114,6 +130,20 @@ public class NamesQueryFilter implements IDiskAtomFilter
{
}
+ public int getLiveCount(ColumnFamily cf)
+ {
+ if (countCQL3Rows)
+ return cf.hasOnlyTombstones() ? 0 : 1;
+
+ int count = 0;
+ for (IColumn column : cf)
+ {
+ if (column.isLive())
+ count++;
+ }
+ return count;
+ }
+
public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
{
public void serialize(NamesQueryFilter f, DataOutput dos, int version) throws IOException
@@ -123,6 +153,10 @@ public class NamesQueryFilter implements IDiskAtomFilter
{
ByteBufferUtil.writeWithShortLength(cName, dos);
}
+ // If we talking against an older node, we have no way to tell him that we want to count CQL3 rows. This does mean that
+ // this node may return less data than required. The workaround being to upgrade all nodes.
+ if (version >= MessagingService.VERSION_12)
+ dos.writeBoolean(f.countCQL3Rows);
}
public NamesQueryFilter deserialize(DataInput dis, int version) throws IOException
@@ -136,7 +170,10 @@ public class NamesQueryFilter implements IDiskAtomFilter
SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(comparator);
for (int i = 0; i < size; ++i)
columns.add(ByteBufferUtil.readWithShortLength(dis));
- return new NamesQueryFilter(columns);
+ boolean countCQL3Rows = version >= MessagingService.VERSION_12
+ ? dis.readBoolean()
+ : false;
+ return new NamesQueryFilter(columns, countCQL3Rows);
}
public long serializedSize(NamesQueryFilter f, int version)
@@ -148,6 +185,8 @@ public class NamesQueryFilter implements IDiskAtomFilter
int cNameSize = cName.remaining();
size += sizes.sizeof((short) cNameSize) + cNameSize;
}
+ if (version >= MessagingService.VERSION_12)
+ size += sizes.sizeof(f.countCQL3Rows);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 7ef7977..2971151 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -58,6 +58,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
this(new ColumnSlice[] { new ColumnSlice(start, finish) }, reversed, count);
}
+ public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count, int compositesToGroup)
+ {
+ this(new ColumnSlice[] { new ColumnSlice(start, finish) }, reversed, count, compositesToGroup, 1);
+ }
+
/**
* Constructor that accepts multiple slices. All slices are assumed to be in the same direction (forward or
* reversed).
@@ -81,6 +86,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup, countMutliplierForCompatibility);
}
+ public SliceQueryFilter withUpdatedSlices(ColumnSlice[] newSlices)
+ {
+ return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup, countMutliplierForCompatibility);
+ }
+
public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
{
return Memtable.getSliceIterator(key, cf, this);
@@ -242,7 +252,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
@Override
public String toString()
{
- return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + "]";
+ return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + ", toGroup = " + compositesToGroup + "]";
}
public boolean isReversed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 5a3da59..1be04dd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -534,7 +534,7 @@ public class SecondaryIndexManager
* @param dataFilter the column range to restrict to
* @return found indexed rows
*/
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns)
+ public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
{
List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
@@ -546,7 +546,7 @@ public class SecondaryIndexManager
throw new RuntimeException("Unable to search across multiple secondary index types");
- return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, maxIsColumns);
+ return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, countCQL3Rows);
}
public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index d49169c..a8c1dde 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -38,7 +38,7 @@ public abstract class SecondaryIndexSearcher
this.baseCfs = indexManager.baseCfs;
}
- public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns);
+ public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows);
/**
* @return true this index is able to handle given clauses.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index d5a2611..4817a00 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -78,10 +78,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns)
+ public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
{
assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, maxIsColumns, false);
+ ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
return baseCfs.filter(getIndexedIterator(range, filter), filter);
}
@@ -301,7 +301,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (!originalFilter.includes(baseComparator, start))
continue;
- SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE);
+ SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE, prefixSize);
ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, path, dataFilter));
if (newData != null)
{
@@ -322,7 +322,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (data == null)
data = ColumnFamily.create(baseCfs.metadata);
data.resolve(newData);
- columnsCount += newData.getLiveColumnCount();
+ columnsCount += dataFilter.lastCounted();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index a07f773..4be7988 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -75,10 +75,10 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns)
+ public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
{
assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, maxIsColumns, false);
+ ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
return baseCfs.filter(getIndexedIterator(range, filter), filter);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index ef7beaa..ba8283f 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -41,9 +41,9 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
{
ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
if (cfs.indexManager.hasIndexFor(command.row_filter))
- return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.maxIsColumns);
+ return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.countCQL3Rows);
else
- return cfs.getRangeSlice(command.super_column, command.range, command.maxResults, command.predicate, command.row_filter, command.maxIsColumns, command.isPaging);
+ return cfs.getRangeSlice(command.super_column, command.range, command.maxResults, command.predicate, command.row_filter, command.countCQL3Rows, command.isPaging);
}
public void doVerb(MessageIn<RangeSliceCommand> message, String id)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
index 975b204..21cf5ab 100644
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ b/src/java/org/apache/cassandra/service/RowRepairResolver.java
@@ -44,12 +44,12 @@ public class RowRepairResolver extends AbstractRowResolver
{
private int maxLiveCount = 0;
public List<IAsyncResult> repairResults = Collections.emptyList();
- private final SliceQueryFilter filter; // can be null if names query
+ private final IDiskAtomFilter filter;
public RowRepairResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
{
super(key, table);
- this.filter = qFilter instanceof SliceQueryFilter ? (SliceQueryFilter)qFilter : null;
+ this.filter = qFilter;
}
/*
@@ -80,7 +80,7 @@ public class RowRepairResolver extends AbstractRowResolver
endpoints.add(message.from);
// compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
- int liveCount = cf == null ? 0 : (filter == null ? cf.getLiveColumnCount() : filter.getLiveCount(cf));
+ int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
if (liveCount > maxLiveCount)
maxLiveCount = liveCount;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b747075..0c3eae9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -1093,13 +1094,9 @@ public class StorageProxy implements StorageProxyMBean
// now scan until we have enough results
try
{
- final IDiskAtomFilter emptyPredicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- -1);
IDiskAtomFilter commandPredicate = command.predicate;
- int columnsCount = 0;
+ int cql3RowCount = 0;
rows = new ArrayList<Row>();
List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
for (AbstractBounds<RowPosition> range : ranges)
@@ -1111,7 +1108,7 @@ public class StorageProxy implements StorageProxyMBean
range,
command.row_filter,
command.maxResults,
- command.maxIsColumns,
+ command.countCQL3Rows,
command.isPaging);
List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
@@ -1144,7 +1141,8 @@ public class StorageProxy implements StorageProxyMBean
for (Row row : handler.get())
{
rows.add(row);
- columnsCount += row.getLiveColumnCount();
+ if (nodeCmd.countCQL3Rows)
+ cql3RowCount += row.getLiveCount(commandPredicate);
logger.trace("range slices read {}", row.key);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
@@ -1162,14 +1160,18 @@ public class StorageProxy implements StorageProxyMBean
}
// if we're done, great, otherwise, move to the next range
- int count = nodeCmd.maxIsColumns ? columnsCount : rows.size();
+ int count = nodeCmd.countCQL3Rows ? cql3RowCount : rows.size();
if (count >= nodeCmd.maxResults)
break;
// if we are paging and already got some rows, reset the column filter predicate,
// so we start iterating the next row from the first column
if (!rows.isEmpty() && command.isPaging)
- commandPredicate = emptyPredicate;
+ {
+ // We only allow paging with a slice filter (doesn't make sense otherwise anyway)
+ assert commandPredicate instanceof SliceQueryFilter;
+ commandPredicate = ((SliceQueryFilter)commandPredicate).withUpdatedSlices(ColumnSlice.ALL_COLUMNS_ARRAY);
+ }
}
}
finally
@@ -1189,8 +1191,8 @@ public class StorageProxy implements StorageProxyMBean
private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
{
- // When maxIsColumns, we let the caller trim the result.
- if (command.maxIsColumns)
+ // When countCQL3Rows, we let the caller trim the result.
+ if (command.countCQL3Rows)
return rows;
else
return rows.size() > command.maxResults ? rows.subList(0, command.maxResults) : rows;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 625d25f..cd34a91 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -948,7 +948,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
int columns = 0;
for (Row row : rows)
{
- columns += row.getLiveColumnCount();
+ columns += row.getLiveCount(new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, expectedCount));
}
assert columns == expectedCount : "Expected " + expectedCount + " live columns but got " + columns + ": " + rows;
}