You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/08/04 15:13:51 UTC
[3/4] cassandra git commit: Add support for GROUP BY to SELECT
statement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 85cae0c..abe029b 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.aggregation.GroupMaker;
+import org.apache.cassandra.db.aggregation.GroupingState;
+import org.apache.cassandra.db.aggregation.AggregationSpecification;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.transform.BasePartitions;
@@ -68,7 +71,7 @@ public abstract class DataLimits
// partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true);
- public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
+ public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT, CQL_GROUP_BY_LIMIT, CQL_GROUP_BY_PAGING_LIMIT }
public static DataLimits cqlLimits(int cqlRowLimit)
{
@@ -89,6 +92,14 @@ public abstract class DataLimits
: new CQLLimits(cqlRowLimit, perPartitionLimit, isDistinct);
}
+ public static DataLimits groupByLimits(int groupLimit,
+ int groupPerPartitionLimit,
+ int rowLimit,
+ AggregationSpecification groupBySpec)
+ {
+ return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
+ }
+
public static DataLimits distinctLimits(int cqlRowLimit)
{
return CQLLimits.distinct(cqlRowLimit);
@@ -109,11 +120,32 @@ public abstract class DataLimits
public abstract boolean isUnlimited();
public abstract boolean isDistinct();
+ public boolean isGroupByLimit()
+ {
+ return false;
+ }
+
+ public boolean isExhausted(Counter counter)
+ {
+ return counter.counted() < count();
+ }
+
public abstract DataLimits forPaging(int pageSize);
public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining);
public abstract DataLimits forShortReadRetry(int toFetch);
+ /**
+ * Creates a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries.
+ *
+ * @param state the <code>GroupMaker</code> state
+ * @return a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries
+ */
+ public DataLimits forGroupByInternalPaging(GroupingState state)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec);
/**
@@ -139,6 +171,12 @@ public abstract class DataLimits
public abstract int perPartitionCount();
+ /**
+ * Returns equivalent limits but where any internal state kept to track where we are of paging and/or grouping is
+ * discarded.
+ */
+ public abstract DataLimits withoutState();
+
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
{
return this.newCounter(nowInSec, false).applyTo(iter);
@@ -162,9 +200,18 @@ public abstract class DataLimits
public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>>
{
+ protected final int nowInSec;
+ protected final boolean assumeLiveData;
+
// false means we do not propagate our stop signals onto the iterator, we only count
private boolean enforceLimits = true;
+ protected Counter(int nowInSec, boolean assumeLiveData)
+ {
+ this.nowInSec = nowInSec;
+ this.assumeLiveData = assumeLiveData;
+ }
+
public Counter onlyCount()
{
this.enforceLimits = false;
@@ -199,11 +246,31 @@ public abstract class DataLimits
* @return the number of results counted.
*/
public abstract int counted();
+
public abstract int countedInCurrentPartition();
+ /**
+ * The number of rows counted.
+ *
+ * @return the number of rows counted.
+ */
+ public abstract int rowCounted();
+
+ /**
+ * The number of rows counted in the current partition.
+ *
+ * @return the number of rows counted in the current partition.
+ */
+ public abstract int rowCountedInCurrentPartition();
+
public abstract boolean isDone();
public abstract boolean isDoneForPartition();
+ protected boolean isLive(Row row)
+ {
+ return assumeLiveData || row.hasLiveData(nowInSec);
+ }
+
@Override
protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
{
@@ -232,6 +299,12 @@ public abstract class DataLimits
if (isDoneForPartition())
stopInPartition();
}
+
+ @Override
+ public void onClose()
+ {
+ super.onClose();
+ }
}
/**
@@ -341,6 +414,11 @@ public abstract class DataLimits
return perPartitionLimit;
}
+ public DataLimits withoutState()
+ {
+ return this;
+ }
+
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// TODO: we should start storing stats on the number of rows (instead of the number of cells, which
@@ -351,9 +429,6 @@ public abstract class DataLimits
protected class CQLCounter extends Counter
{
- protected final int nowInSec;
- protected final boolean assumeLiveData;
-
protected int rowCounted;
protected int rowInCurrentPartition;
@@ -361,21 +436,20 @@ public abstract class DataLimits
public CQLCounter(int nowInSec, boolean assumeLiveData)
{
- this.nowInSec = nowInSec;
- this.assumeLiveData = assumeLiveData;
+ super(nowInSec, assumeLiveData);
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowInCurrentPartition = 0;
- hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec));
+ hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
}
@Override
public Row applyToRow(Row row)
{
- if (assumeLiveData || row.hasLiveData(nowInSec))
+ if (isLive(row))
incrementRowCount();
return row;
}
@@ -391,7 +465,7 @@ public abstract class DataLimits
super.onPartitionClose();
}
- private void incrementRowCount()
+ protected void incrementRowCount()
{
if (++rowCounted >= rowLimit)
stop();
@@ -409,6 +483,16 @@ public abstract class DataLimits
return rowInCurrentPartition;
}
+ public int rowCounted()
+ {
+ return rowCounted;
+ }
+
+ public int rowCountedInCurrentPartition()
+ {
+ return rowInCurrentPartition;
+ }
+
public boolean isDone()
{
return rowCounted >= rowLimit;
@@ -470,6 +554,12 @@ public abstract class DataLimits
}
@Override
+ public DataLimits withoutState()
+ {
+ return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
+ }
+
+ @Override
public Counter newCounter(int nowInSec, boolean assumeLiveData)
{
return new PagingAwareCounter(nowInSec, assumeLiveData);
@@ -503,6 +593,499 @@ public abstract class DataLimits
}
/**
+ * <code>CQLLimits</code> used for GROUP BY queries or queries with aggregates.
+ * <p>Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence,
+ * the limits keep track of the number of rows as well as the number of groups.</p>
+ * <p>A group can only be counted if the next group or the end of the data is reached.</p>
+ */
+ private static class CQLGroupByLimits extends CQLLimits
+ {
+ /**
+ * The <code>GroupMaker</code> state
+ */
+ protected final GroupingState state;
+
+ /**
+ * The GROUP BY specification
+ */
+ protected final AggregationSpecification groupBySpec;
+
+ /**
+ * The limit on the number of groups
+ */
+ protected final int groupLimit;
+
+ /**
+ * The limit on the number of groups per partition
+ */
+ protected final int groupPerPartitionLimit;
+
+ public CQLGroupByLimits(int groupLimit,
+ int groupPerPartitionLimit,
+ int rowLimit,
+ AggregationSpecification groupBySpec)
+ {
+ this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, GroupingState.EMPTY_STATE);
+ }
+
+ private CQLGroupByLimits(int groupLimit,
+ int groupPerPartitionLimit,
+ int rowLimit,
+ AggregationSpecification groupBySpec,
+ GroupingState state)
+ {
+ super(rowLimit, NO_LIMIT, false);
+ this.groupLimit = groupLimit;
+ this.groupPerPartitionLimit = groupPerPartitionLimit;
+ this.groupBySpec = groupBySpec;
+ this.state = state;
+ }
+
+ @Override
+ public Kind kind()
+ {
+ return Kind.CQL_GROUP_BY_LIMIT;
+ }
+
+ @Override
+ public boolean isGroupByLimit()
+ {
+ return true;
+ }
+
+ public boolean isUnlimited()
+ {
+ return groupLimit == NO_LIMIT && groupPerPartitionLimit == NO_LIMIT && rowLimit == NO_LIMIT;
+ }
+
+ public DataLimits forShortReadRetry(int toFetch)
+ {
+ return new CQLLimits(toFetch);
+ }
+
+ @Override
+ public float estimateTotalResults(ColumnFamilyStore cfs)
+ {
+ // For the moment, we return the estimated number of rows as we have no good way of estimating
+ // the number of groups that will be returned. Hopefully, we should be able to fix
+ // that problem at some point.
+ return super.estimateTotalResults(cfs);
+ }
+
+ @Override
+ public DataLimits forPaging(int pageSize)
+ {
+ return new CQLGroupByLimits(pageSize,
+ groupPerPartitionLimit,
+ rowLimit,
+ groupBySpec,
+ state);
+ }
+
+ @Override
+ public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
+ {
+ return new CQLGroupByPagingLimits(pageSize,
+ groupPerPartitionLimit,
+ rowLimit,
+ groupBySpec,
+ state,
+ lastReturnedKey,
+ lastReturnedKeyRemaining);
+ }
+
+ @Override
+ public DataLimits forGroupByInternalPaging(GroupingState state)
+ {
+ return new CQLGroupByLimits(rowLimit,
+ groupPerPartitionLimit,
+ rowLimit,
+ groupBySpec,
+ state);
+ }
+
+ @Override
+ public Counter newCounter(int nowInSec, boolean assumeLiveData)
+ {
+ return new GroupByAwareCounter(nowInSec, assumeLiveData);
+ }
+
+ @Override
+ public int count()
+ {
+ return groupLimit;
+ }
+
+ @Override
+ public int perPartitionCount()
+ {
+ return groupPerPartitionLimit;
+ }
+
+ @Override
+ public DataLimits withoutState()
+ {
+ return state == GroupingState.EMPTY_STATE
+ ? this
+ : new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ if (groupLimit != NO_LIMIT)
+ {
+ sb.append("GROUP LIMIT ").append(groupLimit);
+ if (groupPerPartitionLimit != NO_LIMIT || rowLimit != NO_LIMIT)
+ sb.append(' ');
+ }
+
+ if (groupPerPartitionLimit != NO_LIMIT)
+ {
+ sb.append("GROUP PER PARTITION LIMIT ").append(groupPerPartitionLimit);
+ if (rowLimit != NO_LIMIT)
+ sb.append(' ');
+ }
+
+ if (rowLimit != NO_LIMIT)
+ {
+ sb.append("LIMIT ").append(rowLimit);
+ }
+
+ return sb.toString();
+ }
+
+ @Override
+ public boolean isExhausted(Counter counter)
+ {
+ return ((GroupByAwareCounter) counter).rowCounted < rowLimit
+ && counter.counted() < groupLimit;
+ }
+
+ protected class GroupByAwareCounter extends Counter
+ {
+ private final GroupMaker groupMaker;
+
+ /**
+ * The key of the partition being processed.
+ */
+ protected DecoratedKey currentPartitionKey;
+
+ /**
+ * The number of rows counted so far.
+ */
+ protected int rowCounted;
+
+ /**
+ * The number of rows counted so far in the current partition.
+ */
+ protected int rowCountedInCurrentPartition;
+
+ /**
+ * The number of groups counted so far. A group is counted only once it is complete
+ * (e.g the next one has been reached).
+ */
+ protected int groupCounted;
+
+ /**
+ * The number of groups in the current partition.
+ */
+ protected int groupInCurrentPartition;
+
+ protected boolean hasGroupStarted;
+
+ protected boolean hasLiveStaticRow;
+
+ protected boolean hasReturnedRowsFromCurrentPartition;
+
+ private GroupByAwareCounter(int nowInSec, boolean assumeLiveData)
+ {
+ super(nowInSec, assumeLiveData);
+ this.groupMaker = groupBySpec.newGroupMaker(state);
+
+ // If the end of the partition was reached at the same time than the row limit, the last group might
+ // not have been counted yet. Due to that we need to guess, based on the state, if the previous group
+ // is still open.
+ hasGroupStarted = state.hasClustering();
+ }
+
+ @Override
+ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
+ {
+ if (partitionKey.getKey().equals(state.partitionKey()))
+ {
+ // The only case were we could have state.partitionKey() equals to the partition key
+ // is if some of the partition rows have been returned in the previous page but the
+ // partition was not exhausted (as the state partition key has not been updated yet).
+ // Since we know we have returned rows, we know we have accounted for
+ // the static row if any already, so force hasLiveStaticRow to false so we make sure to not count it
+ // once more.
+ hasLiveStaticRow = false;
+ hasReturnedRowsFromCurrentPartition = true;
+ hasGroupStarted = true;
+ }
+ else
+ {
+ // We need to increment our count of groups if we have reached a new one and unless we had no new
+ // content added since we closed our last group (that is, if hasGroupStarted). Note that we may get
+ // here with hasGroupStarted == false in the following cases:
+ // * the partition limit was reached for the previous partition
+ // * the previous partition was containing only one static row
+ // * the rows of the last group of the previous partition were all marked as deleted
+ if (hasGroupStarted && groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING))
+ {
+ incrementGroupCount();
+ // If we detect, before starting the new partition, that we are done, we need to increase
+ // the per partition group count of the previous partition as the next page will start from
+ // there.
+ if (isDone())
+ incrementGroupInCurrentPartitionCount();
+ hasGroupStarted = false;
+ }
+ hasReturnedRowsFromCurrentPartition = false;
+ hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
+ }
+ currentPartitionKey = partitionKey;
+ // If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition
+ // because the pager need to retrieve the count associated to the last value it has returned.
+ if (!isDone())
+ {
+ groupInCurrentPartition = 0;
+ rowCountedInCurrentPartition = 0;
+ }
+ }
+
+ @Override
+ protected Row applyToStatic(Row row)
+ {
+ // It's possible that we're "done" if the partition we just started bumped the number of groups (in
+ // applyToPartition() above), in which case Transformation will still call this method. In that case, we
+ // want to ignore the static row, it should (and will) be returned with the next page/group if needs be.
+ if (isDone())
+ {
+ hasLiveStaticRow = false; // The row has not been returned
+ return Rows.EMPTY_STATIC_ROW;
+ }
+ return row;
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ // We want to check if the row belongs to a new group even if it has been deleted. The goal being
+ // to minimize the chances of having to go through the same data twice if we detect on the next
+ // non deleted row that we have reached the limit.
+ if (groupMaker.isNewGroup(currentPartitionKey, row.clustering()))
+ {
+ if (hasGroupStarted)
+ {
+ incrementGroupCount();
+ incrementGroupInCurrentPartitionCount();
+ }
+ hasGroupStarted = false;
+ }
+
+ // That row may have made us increment the group count, which may mean we're done for this partition, in
+ // which case we shouldn't count this row (it won't be returned).
+ if (isDoneForPartition())
+ {
+ hasGroupStarted = false;
+ return null;
+ }
+
+ if (isLive(row))
+ {
+ hasGroupStarted = true;
+ incrementRowCount();
+ hasReturnedRowsFromCurrentPartition = true;
+ }
+
+ return row;
+ }
+
+ @Override
+ public int counted()
+ {
+ return groupCounted;
+ }
+
+ @Override
+ public int countedInCurrentPartition()
+ {
+ return groupInCurrentPartition;
+ }
+
+ @Override
+ public int rowCounted()
+ {
+ return rowCounted;
+ }
+
+ @Override
+ public int rowCountedInCurrentPartition()
+ {
+ return rowCountedInCurrentPartition;
+ }
+
+ protected void incrementRowCount()
+ {
+ rowCountedInCurrentPartition++;
+ if (++rowCounted >= rowLimit)
+ stop();
+ }
+
+ private void incrementGroupCount()
+ {
+ groupCounted++;
+ if (groupCounted >= groupLimit)
+ stop();
+ }
+
+ private void incrementGroupInCurrentPartitionCount()
+ {
+ groupInCurrentPartition++;
+ if (groupInCurrentPartition >= groupPerPartitionLimit)
+ stopInPartition();
+ }
+
+ @Override
+ public boolean isDoneForPartition()
+ {
+ return isDone() || groupInCurrentPartition >= groupPerPartitionLimit;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return groupCounted >= groupLimit;
+ }
+
+ @Override
+ public void onPartitionClose()
+ {
+ // Normally, we don't count static rows as from a CQL point of view, it will be merge with other
+ // rows in the partition. However, if we only have the static row, it will be returned as one group
+ // so count it.
+ if (hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition)
+ {
+ incrementRowCount();
+ incrementGroupCount();
+ incrementGroupInCurrentPartitionCount();
+ hasGroupStarted = false;
+ }
+ super.onPartitionClose();
+ }
+
+ @Override
+ public void onClose()
+ {
+ // Groups are only counted when the end of the group is reached.
+ // The end of a group is detected by 2 ways:
+ // 1) a new group is reached
+ // 2) the end of the data is reached
+ // We know that the end of the data is reached if the group limit has not been reached
+ // and the number of rows counted is smaller than the internal page size.
+ if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit)
+ {
+ incrementGroupCount();
+ incrementGroupInCurrentPartitionCount();
+ }
+
+ super.onClose();
+ }
+ }
+ }
+
+ private static class CQLGroupByPagingLimits extends CQLGroupByLimits
+ {
+ private final ByteBuffer lastReturnedKey;
+
+ private final int lastReturnedKeyRemaining;
+
+ public CQLGroupByPagingLimits(int groupLimit,
+ int groupPerPartitionLimit,
+ int rowLimit,
+ AggregationSpecification groupBySpec,
+ GroupingState state,
+ ByteBuffer lastReturnedKey,
+ int lastReturnedKeyRemaining)
+ {
+ super(groupLimit,
+ groupPerPartitionLimit,
+ rowLimit,
+ groupBySpec,
+ state);
+
+ this.lastReturnedKey = lastReturnedKey;
+ this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
+ }
+
+ @Override
+ public Kind kind()
+ {
+ return Kind.CQL_GROUP_BY_PAGING_LIMIT;
+ }
+
+ @Override
+ public DataLimits forPaging(int pageSize)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataLimits forGroupByInternalPaging(GroupingState state)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Counter newCounter(int nowInSec, boolean assumeLiveData)
+ {
+ assert state == GroupingState.EMPTY_STATE || lastReturnedKey.equals(state.partitionKey());
+ return new PagingGroupByAwareCounter(nowInSec, assumeLiveData);
+ }
+
+ @Override
+ public DataLimits withoutState()
+ {
+ return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
+ }
+
+ private class PagingGroupByAwareCounter extends GroupByAwareCounter
+ {
+ private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData)
+ {
+ super(nowInSec, assumeLiveData);
+ }
+
+ @Override
+ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
+ {
+ if (partitionKey.getKey().equals(lastReturnedKey))
+ {
+ currentPartitionKey = partitionKey;
+ groupInCurrentPartition = groupPerPartitionLimit - lastReturnedKeyRemaining;
+ hasReturnedRowsFromCurrentPartition = true;
+ hasLiveStaticRow = false;
+ hasGroupStarted = state.hasClustering();
+ }
+ else
+ {
+ super.applyToPartition(partitionKey, staticRow);
+ }
+ }
+ }
+ }
+
+ /**
* Limits used by thrift; this count partition and cells.
*/
private static class ThriftLimits extends DataLimits
@@ -593,6 +1176,11 @@ public abstract class DataLimits
return cellPerPartitionLimit;
}
+ public DataLimits withoutState()
+ {
+ return this;
+ }
+
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// remember that getMeansColumns returns a number of cells: we should clean nomenclature
@@ -602,17 +1190,13 @@ public abstract class DataLimits
protected class ThriftCounter extends Counter
{
- protected final int nowInSec;
- protected final boolean assumeLiveData;
-
protected int partitionsCounted;
protected int cellsCounted;
protected int cellsInCurrentPartition;
public ThriftCounter(int nowInSec, boolean assumeLiveData)
{
- this.nowInSec = nowInSec;
- this.assumeLiveData = assumeLiveData;
+ super(nowInSec, assumeLiveData);
}
@Override
@@ -656,6 +1240,16 @@ public abstract class DataLimits
return cellsInCurrentPartition;
}
+ public int rowCounted()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int rowCountedInCurrentPartition()
+ {
+ throw new UnsupportedOperationException();
+ }
+
public boolean isDone()
{
return partitionsCounted >= partitionLimit;
@@ -723,7 +1317,7 @@ public abstract class DataLimits
public Row applyToRow(Row row)
{
// In the internal format, a row == a super column, so that's what we want to count.
- if (assumeLiveData || row.hasLiveData(nowInSec))
+ if (isLive(row))
{
++cellsCounted;
if (++cellsInCurrentPartition >= cellPerPartitionLimit)
@@ -736,7 +1330,7 @@ public abstract class DataLimits
public static class Serializer
{
- public void serialize(DataLimits limits, DataOutputPlus out, int version) throws IOException
+ public void serialize(DataLimits limits, DataOutputPlus out, int version, ClusteringComparator comparator) throws IOException
{
out.writeByte(limits.kind().ordinal());
switch (limits.kind())
@@ -754,6 +1348,25 @@ public abstract class DataLimits
out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
+ case CQL_GROUP_BY_LIMIT:
+ case CQL_GROUP_BY_PAGING_LIMIT:
+ CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits;
+ out.writeUnsignedVInt(groupByLimits.groupLimit);
+ out.writeUnsignedVInt(groupByLimits.groupPerPartitionLimit);
+ out.writeUnsignedVInt(groupByLimits.rowLimit);
+
+ AggregationSpecification groupBySpec = groupByLimits.groupBySpec;
+ AggregationSpecification.serializer.serialize(groupBySpec, out, version);
+
+ GroupingState.serializer.serialize(groupByLimits.state, out, version, comparator);
+
+ if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT)
+ {
+ CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits;
+ ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out);
+ out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
+ }
+ break;
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
ThriftLimits thriftLimits = (ThriftLimits)limits;
@@ -763,54 +1376,102 @@ public abstract class DataLimits
}
}
- public DataLimits deserialize(DataInputPlus in, int version) throws IOException
+ public DataLimits deserialize(DataInputPlus in, int version, ClusteringComparator comparator) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedByte()];
switch (kind)
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
- int rowLimit = (int)in.readUnsignedVInt();
- int perPartitionLimit = (int)in.readUnsignedVInt();
+ {
+ int rowLimit = (int) in.readUnsignedVInt();
+ int perPartitionLimit = (int) in.readUnsignedVInt();
boolean isDistinct = in.readBoolean();
if (kind == Kind.CQL_LIMIT)
return cqlLimits(rowLimit, perPartitionLimit, isDistinct);
-
ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
- int lastRemaining = (int)in.readUnsignedVInt();
+ int lastRemaining = (int) in.readUnsignedVInt();
return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining);
+ }
+ case CQL_GROUP_BY_LIMIT:
+ case CQL_GROUP_BY_PAGING_LIMIT:
+ {
+ int groupLimit = (int) in.readUnsignedVInt();
+ int groupPerPartitionLimit = (int) in.readUnsignedVInt();
+ int rowLimit = (int) in.readUnsignedVInt();
+
+ AggregationSpecification groupBySpec = AggregationSpecification.serializer.deserialize(in, version, comparator);
+
+ GroupingState state = GroupingState.serializer.deserialize(in, version, comparator);
+
+ if (kind == Kind.CQL_GROUP_BY_LIMIT)
+ return new CQLGroupByLimits(groupLimit,
+ groupPerPartitionLimit,
+ rowLimit,
+ groupBySpec,
+ state);
+
+ ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
+ int lastRemaining = (int) in.readUnsignedVInt();
+ return new CQLGroupByPagingLimits(groupLimit,
+ groupPerPartitionLimit,
+ rowLimit,
+ groupBySpec,
+ state,
+ lastKey,
+ lastRemaining);
+ }
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
- int partitionLimit = (int)in.readUnsignedVInt();
- int cellPerPartitionLimit = (int)in.readUnsignedVInt();
+ int partitionLimit = (int) in.readUnsignedVInt();
+ int cellPerPartitionLimit = (int) in.readUnsignedVInt();
return kind == Kind.THRIFT_LIMIT
- ? new ThriftLimits(partitionLimit, cellPerPartitionLimit)
- : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
+ ? new ThriftLimits(partitionLimit, cellPerPartitionLimit)
+ : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
}
throw new AssertionError();
}
- public long serializedSize(DataLimits limits, int version)
+ public long serializedSize(DataLimits limits, int version, ClusteringComparator comparator)
{
- long size = TypeSizes.sizeof((byte)limits.kind().ordinal());
+ long size = TypeSizes.sizeof((byte) limits.kind().ordinal());
switch (limits.kind())
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
- CQLLimits cqlLimits = (CQLLimits)limits;
+ CQLLimits cqlLimits = (CQLLimits) limits;
size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit);
size += TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit);
size += TypeSizes.sizeof(cqlLimits.isDistinct);
if (limits.kind() == Kind.CQL_PAGING_LIMIT)
{
- CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits;
+ CQLPagingLimits pagingLimits = (CQLPagingLimits) cqlLimits;
+ size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
+ size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
+ }
+ break;
+ case CQL_GROUP_BY_LIMIT:
+ case CQL_GROUP_BY_PAGING_LIMIT:
+ CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits;
+ size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupLimit);
+ size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupPerPartitionLimit);
+ size += TypeSizes.sizeofUnsignedVInt(groupByLimits.rowLimit);
+
+ AggregationSpecification groupBySpec = groupByLimits.groupBySpec;
+ size += AggregationSpecification.serializer.serializedSize(groupBySpec, version);
+
+ size += GroupingState.serializer.serializedSize(groupByLimits.state, version, comparator);
+
+ if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT)
+ {
+ CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits;
size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
- ThriftLimits thriftLimits = (ThriftLimits)limits;
+ ThriftLimits thriftLimits = (ThriftLimits) limits;
size += TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit);
size += TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit);
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 2c1b347..b9ae933 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.DataLimits.Counter;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.MoreRows;
@@ -436,9 +437,9 @@ public class DataResolver extends ResponseResolver
// Also note that we only get here once all the results for this node have been returned, and so
// if the node had returned the requested number but we still get there, it imply some results were
// skipped during reconciliation.
- if (lastCount == counter.counted() || !counter.isDoneForPartition())
+ if (lastCount == counted(counter) || !counter.isDoneForPartition())
return null;
- lastCount = counter.counted();
+ lastCount = counted(counter);
assert !postReconciliationCounter.isDoneForPartition();
@@ -450,8 +451,8 @@ public class DataResolver extends ResponseResolver
// we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
// Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
// counting iterator.
- int n = postReconciliationCounter.countedInCurrentPartition();
- int x = counter.countedInCurrentPartition();
+ int n = countedInCurrentPartition(postReconciliationCounter);
+ int x = countedInCurrentPartition(counter);
int toQuery = Math.max(((n * n) / x) - n, 1);
DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
@@ -468,6 +469,38 @@ public class DataResolver extends ResponseResolver
return doShortReadRetry(cmd);
}
+ /**
+ * Returns the number of results counted by the counter.
+ *
+ * @param counter the counter.
+ * @return the number of results counted by the counter
+ */
+ private int counted(Counter counter)
+ {
+ // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of
+ // groups.
+ if (command.limits().isGroupByLimit())
+ return counter.rowCounted();
+
+ return counter.counted();
+ }
+
+ /**
+ * Returns the number of results counted in the partition by the counter.
+ *
+ * @param counter the counter.
+ * @return the number of results counted in the partition by the counter
+ */
+ private int countedInCurrentPartition(Counter counter)
+ {
+ // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns
+ // the number of groups in the current partition.
+ if (command.limits().isGroupByLimit())
+ return counter.rowCountedInCurrentPartition();
+
+ return counter.countedInCurrentPartition();
+ }
+
private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
{
DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2839259..bc8c46c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2080,9 +2080,18 @@ public class StorageProxy implements StorageProxyMBean
rowsPerRange, (int) remainingRows, concurrencyFactor);
}
- private SingleRangeResponse query(RangeForQuery toQuery)
+ /**
+ * Queries the provided sub-range.
+ *
+ * @param toQuery the subRange to query.
+ * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
+ * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
+ * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
+ * that it's the query that "continues" whatever we're previously queried).
+ */
+ private SingleRangeResponse query(RangeForQuery toQuery, boolean isFirst)
{
- PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
+ PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst);
DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
@@ -2115,7 +2124,7 @@ public class StorageProxy implements StorageProxyMBean
List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
{
- concurrentQueries.add(query(ranges.next()));
+ concurrentQueries.add(query(ranges.next(), i == 0));
++rangesQueried;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 06252ef..01a56c4 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -22,8 +22,6 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.ClientState;
abstract class AbstractQueryPager implements QueryPager
@@ -57,23 +55,25 @@ abstract class AbstractQueryPager implements QueryPager
return command.executionController();
}
- public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState)
{
if (isExhausted())
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+
return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
}
- public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
+ public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
{
if (isExhausted())
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+
return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager);
}
@@ -81,6 +81,7 @@ abstract class AbstractQueryPager implements QueryPager
{
private final DataLimits pageLimits;
private final DataLimits.Counter counter;
+ private DecoratedKey currentKey;
private Row lastRow;
private boolean isFirstPartition = true;
@@ -93,10 +94,7 @@ abstract class AbstractQueryPager implements QueryPager
@Override
public RowIterator applyToPartition(RowIterator partition)
{
- DecoratedKey key = partition.partitionKey();
- if (lastKey == null || !lastKey.equals(key))
- remainingInPartition = limits.perPartitionCount();
- lastKey = key;
+ currentKey = partition.partitionKey();
// If this is the first partition of this page, this could be the continuation of a partition we've started
// on the previous page. In which case, we could have the problem that the partition has no more "regular"
@@ -106,7 +104,7 @@ abstract class AbstractQueryPager implements QueryPager
if (isFirstPartition)
{
isFirstPartition = false;
- if (isPreviouslyReturnedPartition(key) && !partition.hasNext())
+ if (isPreviouslyReturnedPartition(currentKey) && !partition.hasNext())
{
partition.close();
return null;
@@ -119,10 +117,12 @@ abstract class AbstractQueryPager implements QueryPager
@Override
public void onClose()
{
+ // In some case like GROUP BY a counter need to know when the processing is completed.
+ counter.onClose();
+
recordLast(lastKey, lastRow);
- int counted = counter.counted();
- remaining -= counted;
+ remaining -= counter.counted();
// If the clustering of the last row returned is a static one, it means that the partition was only
// containing data within the static columns. If the clustering of the last row returned is empty
// it means that there is only one row per partition. Therefore, in both cases there are no data remaining
@@ -136,19 +136,28 @@ abstract class AbstractQueryPager implements QueryPager
{
remainingInPartition -= counter.countedInCurrentPartition();
}
- exhausted = counted < pageLimits.count();
+ exhausted = pageLimits.isExhausted(counter);
}
public Row applyToStatic(Row row)
{
if (!row.isEmpty())
+ {
+ remainingInPartition = limits.perPartitionCount();
+ lastKey = currentKey;
lastRow = row;
+ }
return row;
}
@Override
public Row applyToRow(Row row)
{
+ if (!currentKey.equals(lastKey))
+ {
+ remainingInPartition = limits.perPartitionCount();
+ lastKey = currentKey;
+ }
lastRow = row;
return row;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
new file mode 100644
index 0000000..1bdaac6
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.aggregation.GroupingState;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * {@code QueryPager} that takes care of fetching the pages for aggregation queries.
+ * <p>
+ * For aggregation/group by queries, the user page size is in number of groups. But each group could be composed of very
+ * many rows so to avoid running into OOMs, this pager will page internal queries into sub-pages. So each call to
+ * {@link fetchPage} may (transparently) yield multiple internal queries (sub-pages).
+ */
+public final class AggregationQueryPager implements QueryPager
+{
+ private final DataLimits limits;
+
+ // The sub-pager, used to retrieve the next sub-page.
+ private QueryPager subPager;
+
+ public AggregationQueryPager(QueryPager subPager, DataLimits limits)
+ {
+ this.subPager = subPager;
+ this.limits = limits;
+ }
+
+ @Override
+ public PartitionIterator fetchPage(int pageSize,
+ ConsistencyLevel consistency,
+ ClientState clientState)
+ {
+ if (limits.isGroupByLimit())
+ return new GroupByPartitionIterator(pageSize, consistency, clientState);
+
+ return new AggregationPartitionIterator(pageSize, consistency, clientState);
+ }
+
+ @Override
+ public ReadExecutionController executionController()
+ {
+ return subPager.executionController();
+ }
+
+ @Override
+ public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
+ {
+ if (limits.isGroupByLimit())
+ return new GroupByPartitionIterator(pageSize, executionController);
+
+ return new AggregationPartitionIterator(pageSize, executionController);
+ }
+
+ @Override
+ public boolean isExhausted()
+ {
+ return subPager.isExhausted();
+ }
+
+ @Override
+ public int maxRemaining()
+ {
+ return subPager.maxRemaining();
+ }
+
+ @Override
+ public PagingState state()
+ {
+ return subPager.state();
+ }
+
+ @Override
+ public QueryPager withUpdatedLimit(DataLimits newLimits)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * <code>PartitionIterator</code> that automatically fetch a new sub-page of data if needed when the current iterator is
+ * exhausted.
+ */
+ public class GroupByPartitionIterator implements PartitionIterator
+ {
+ /**
+ * The top-level page size in number of groups.
+ */
+ private final int pageSize;
+
+ // For "normal" queries
+ private final ConsistencyLevel consistency;
+ private final ClientState clientState;
+
+ // For internal queries
+ private final ReadExecutionController executionController;
+
+ /**
+ * The <code>PartitionIterator</code> over the last page retrieved.
+ */
+ private PartitionIterator partitionIterator;
+
+ /**
+ * The next <code>RowIterator</code> to be returned.
+ */
+ private RowIterator next;
+
+ /**
+ * Specify if all the data have been returned.
+ */
+ private boolean endOfData;
+
+ /**
+ * Keeps track if the partitionIterator has been closed or not.
+ */
+ private boolean closed;
+
+ /**
+ * The key of the last partition processed.
+ */
+ private ByteBuffer lastPartitionKey;
+
+ /**
+ * The clustering of the last row processed
+ */
+ private Clustering lastClustering;
+
+ /**
+ * The initial amount of row remaining
+ */
+ private int initialMaxRemaining;
+
+ public GroupByPartitionIterator(int pageSize,
+ ConsistencyLevel consistency,
+ ClientState clientState)
+ {
+ this(pageSize, consistency, clientState, null);
+ }
+
+ public GroupByPartitionIterator(int pageSize,
+ ReadExecutionController executionController)
+ {
+ this(pageSize, null, null, executionController);
+ }
+
+ private GroupByPartitionIterator(int pageSize,
+ ConsistencyLevel consistency,
+ ClientState clientState,
+ ReadExecutionController executionController)
+ {
+ this.pageSize = handlePagingOff(pageSize);
+ this.consistency = consistency;
+ this.clientState = clientState;
+ this.executionController = executionController;
+ }
+
+ private int handlePagingOff(int pageSize)
+ {
+ // If the paging is off, the pageSize will be <= 0. So we need to replace
+ // it by DataLimits.NO_LIMIT
+ return pageSize <= 0 ? DataLimits.NO_LIMIT : pageSize;
+ }
+
+ public final void close()
+ {
+ if (!closed)
+ {
+ closed = true;
+ partitionIterator.close();
+ }
+ }
+
+ public final boolean hasNext()
+ {
+ if (endOfData)
+ return false;
+
+ if (next != null)
+ return true;
+
+ fetchNextRowIterator();
+
+ return next != null;
+ }
+
+ /**
+ * Loads the next <code>RowIterator</code> to be returned.
+ */
+ private void fetchNextRowIterator()
+ {
+ if (partitionIterator == null)
+ {
+ initialMaxRemaining = subPager.maxRemaining();
+ partitionIterator = fetchSubPage(pageSize);
+ }
+
+ while (!partitionIterator.hasNext())
+ {
+ partitionIterator.close();
+
+ int counted = initialMaxRemaining - subPager.maxRemaining();
+
+ if (isDone(pageSize, counted) || subPager.isExhausted())
+ {
+ endOfData = true;
+ closed = true;
+ return;
+ }
+
+ subPager = updatePagerLimit(subPager, limits, lastPartitionKey, lastClustering);
+ partitionIterator = fetchSubPage(computeSubPageSize(pageSize, counted));
+ }
+
+ next = partitionIterator.next();
+ }
+
+ protected boolean isDone(int pageSize, int counted)
+ {
+ return counted == pageSize;
+ }
+
+ /**
+ * Updates the pager with the new limits if needed.
+ *
+ * @param pager the pager previoulsy used
+ * @param limits the DataLimits
+ * @param lastPartitionKey the partition key of the last row returned
+ * @param lastClustering the clustering of the last row returned
+ * @return the pager to use to query the next page of data
+ */
+ protected QueryPager updatePagerLimit(QueryPager pager,
+ DataLimits limits,
+ ByteBuffer lastPartitionKey,
+ Clustering lastClustering)
+ {
+ GroupingState state = new GroupingState(lastPartitionKey, lastClustering);
+ DataLimits newLimits = limits.forGroupByInternalPaging(state);
+ return pager.withUpdatedLimit(newLimits);
+ }
+
+ /**
+ * Computes the size of the next sub-page to retrieve.
+ *
+ * @param pageSize the top-level page size
+ * @param counted the number of result returned so far by the previous sub-pages
+ * @return the size of the next sub-page to retrieve
+ */
+ protected int computeSubPageSize(int pageSize, int counted)
+ {
+ return pageSize - counted;
+ }
+
+ /**
+ * Fetchs the next sub-page.
+ *
+ * @param subPageSize the sub-page size in number of groups
+ * @return the next sub-page
+ */
+ private final PartitionIterator fetchSubPage(int subPageSize)
+ {
+ return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState)
+ : subPager.fetchPageInternal(subPageSize, executionController);
+ }
+
+ public final RowIterator next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ RowIterator iterator = new GroupByRowIterator(next);
+ lastPartitionKey = iterator.partitionKey().getKey();
+ next = null;
+ return iterator;
+ }
+
+ private class GroupByRowIterator implements RowIterator
+ {
+ /**
+ * The decorated <code>RowIterator</code>.
+ */
+ private RowIterator rowIterator;
+
+ /**
+ * Keeps track if the decorated iterator has been closed or not.
+ */
+ private boolean closed;
+
+ public GroupByRowIterator(RowIterator delegate)
+ {
+ this.rowIterator = delegate;
+ }
+
+ public CFMetaData metadata()
+ {
+ return rowIterator.metadata();
+ }
+
+ public boolean isReverseOrder()
+ {
+ return rowIterator.isReverseOrder();
+ }
+
+ public PartitionColumns columns()
+ {
+ return rowIterator.columns();
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return rowIterator.partitionKey();
+ }
+
+ public Row staticRow()
+ {
+ Row row = rowIterator.staticRow();
+ lastClustering = null;
+ return row;
+ }
+
+ public boolean isEmpty()
+ {
+ return this.rowIterator.isEmpty() && !hasNext();
+ }
+
+ public void close()
+ {
+ if (!closed)
+ rowIterator.close();
+ }
+
+ public boolean hasNext()
+ {
+ if (rowIterator.hasNext())
+ return true;
+
+ DecoratedKey partitionKey = rowIterator.partitionKey();
+
+ rowIterator.close();
+
+ // Fetch the next RowIterator
+ GroupByPartitionIterator.this.hasNext();
+
+ // if the previous page was ending within the partition the
+ // next RowIterator is the continuation of this one
+ if (next != null && partitionKey.equals(next.partitionKey()))
+ {
+ rowIterator = next;
+ next = null;
+ return rowIterator.hasNext();
+ }
+
+ closed = true;
+ return false;
+ }
+
+ public Row next()
+ {
+ Row row = this.rowIterator.next();
+ lastClustering = row.clustering();
+ return row;
+ }
+ }
+ }
+
+ /**
+ * <code>PartitionIterator</code> for queries without Group By but with aggregates.
+ * <p>For maintaining backward compatibility we are forced to use the {@link DataLimits.CQLLimits} instead of the
+ * {@link DataLimits.CQLGroupByLimits}. Due to that pages need to be fetched in a different way.</p>
+ */
+ public final class AggregationPartitionIterator extends GroupByPartitionIterator
+ {
+ public AggregationPartitionIterator(int pageSize,
+ ConsistencyLevel consistency,
+ ClientState clientState)
+ {
+ super(pageSize, consistency, clientState);
+ }
+
+ public AggregationPartitionIterator(int pageSize,
+ ReadExecutionController executionController)
+ {
+ super(pageSize, executionController);
+ }
+
+ @Override
+ protected QueryPager updatePagerLimit(QueryPager pager,
+ DataLimits limits,
+ ByteBuffer lastPartitionKey,
+ Clustering lastClustering)
+ {
+ return pager;
+ }
+
+ @Override
+ protected boolean isDone(int pageSize, int counted)
+ {
+ return false;
+ }
+
+ @Override
+ protected int computeSubPageSize(int pageSize, int counted)
+ {
+ return pageSize;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 57d6c62..9670f28 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.service.pager;
import org.apache.cassandra.utils.AbstractIterator;
+import java.util.Arrays;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.DataLimits;
@@ -81,6 +83,27 @@ public class MultiPartitionPager implements QueryPager
remaining = state == null ? limit.count() : state.remaining;
}
+ private MultiPartitionPager(SinglePartitionPager[] pagers, DataLimits limit, int nowInSec, int remaining, int current)
+ {
+ this.pagers = pagers;
+ this.limit = limit;
+ this.nowInSec = nowInSec;
+ this.remaining = remaining;
+ this.current = current;
+ }
+
+ public QueryPager withUpdatedLimit(DataLimits newLimits)
+ {
+ SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, pagers.length);
+ newPagers[current] = newPagers[current].withUpdatedLimit(newLimits);
+
+ return new MultiPartitionPager(newPagers,
+ newLimits,
+ nowInSec,
+ remaining,
+ current);
+ }
+
public PagingState state()
{
// Sets current to the first non-exhausted pager
@@ -122,27 +145,21 @@ public class MultiPartitionPager implements QueryPager
public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
{
int toQuery = Math.min(remaining, pageSize);
- PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null);
- DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
- iter.setCounter(counter);
- return counter.applyTo(iter);
+ return new PagersIterator(toQuery, consistency, clientState, null);
}
@SuppressWarnings("resource") // iter closed via countingIter
public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
{
int toQuery = Math.min(remaining, pageSize);
- PagersIterator iter = new PagersIterator(toQuery, null, null, executionController);
- DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
- iter.setCounter(counter);
- return counter.applyTo(iter);
+ return new PagersIterator(toQuery, null, null, executionController);
}
private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
{
private final int pageSize;
private PartitionIterator result;
- private DataLimits.Counter counter;
+ private boolean closed;
// For "normal" queries
private final ConsistencyLevel consistency;
@@ -151,6 +168,9 @@ public class MultiPartitionPager implements QueryPager
// For internal queries
private final ReadExecutionController executionController;
+ private int pagerMaxRemaining;
+ private int counted;
+
public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController)
{
this.pageSize = pageSize;
@@ -159,23 +179,30 @@ public class MultiPartitionPager implements QueryPager
this.executionController = executionController;
}
- public void setCounter(DataLimits.Counter counter)
- {
- this.counter = counter;
- }
-
protected RowIterator computeNext()
{
while (result == null || !result.hasNext())
{
if (result != null)
+ {
result.close();
-
- // This sets us on the first non-exhausted pager
- if (isExhausted())
+ counted += pagerMaxRemaining - pagers[current].maxRemaining();
+ }
+
+ // We are done if we have reached the page size or in the case of GROUP BY if the current pager
+ // is not exhausted.
+ boolean isDone = counted >= pageSize
+ || (result != null && limit.isGroupByLimit() && !pagers[current].isExhausted());
+
+ // isExhausted() will sets us on the first non-exhausted pager
+ if (isDone || isExhausted())
+ {
+ closed = true;
return endOfData();
+ }
- int toQuery = pageSize - counter.counted();
+ pagerMaxRemaining = pagers[current].maxRemaining();
+ int toQuery = pageSize - counted;
result = consistency == null
? pagers[current].fetchPageInternal(toQuery, executionController)
: pagers[current].fetchPage(toQuery, consistency, clientState);
@@ -185,8 +212,8 @@ public class MultiPartitionPager implements QueryPager
public void close()
{
- remaining -= counter.counted();
- if (result != null)
+ remaining -= counted;
+ if (result != null && !closed)
result.close();
}
}
@@ -195,4 +222,4 @@ public class MultiPartitionPager implements QueryPager
{
return remaining;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 9c216e3..5a7cccf 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -19,9 +19,6 @@ package org.apache.cassandra.service.pager;
import java.util.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.rows.Row;
@@ -38,8 +35,6 @@ import org.apache.cassandra.schema.IndexMetadata;
*/
public class PartitionRangeQueryPager extends AbstractQueryPager
{
- private static final Logger logger = LoggerFactory.getLogger(PartitionRangeQueryPager.class);
-
private volatile DecoratedKey lastReturnedKey;
private volatile PagingState.RowMark lastReturnedRow;
@@ -55,6 +50,29 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
}
}
+ public PartitionRangeQueryPager(ReadCommand command,
+ int protocolVersion,
+ DecoratedKey lastReturnedKey,
+ PagingState.RowMark lastReturnedRow,
+ int remaining,
+ int remainingInPartition)
+ {
+ super(command, protocolVersion);
+ this.lastReturnedKey = lastReturnedKey;
+ this.lastReturnedRow = lastReturnedRow;
+ restoreState(lastReturnedKey, remaining, remainingInPartition);
+ }
+
+ public PartitionRangeQueryPager withUpdatedLimit(DataLimits newLimits)
+ {
+ return new PartitionRangeQueryPager(command.withUpdatedLimit(newLimits),
+ protocolVersion,
+ lastReturnedKey,
+ lastReturnedRow,
+ maxRemaining(),
+ remainingInPartition());
+ }
+
public PagingState state()
{
return lastReturnedKey == null
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index e2d7f5e..edd2a55 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service.pager;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.EmptyIterators;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.RequestExecutionException;
@@ -77,6 +78,11 @@ public interface QueryPager
{
return null;
}
+
+ public QueryPager withUpdatedLimit(DataLimits newLimits)
+ {
+ throw new UnsupportedOperationException();
+ }
};
/**
@@ -134,4 +140,12 @@ public interface QueryPager
* beginning. If the pager is exhausted, the result is undefined.
*/
public PagingState state();
+
+ /**
+ * Creates a new <code>QueryPager</code> that use the new limits.
+ *
+ * @param newLimits the new limits
+ * @return a new <code>QueryPager</code> that use the new limits
+ */
+ public QueryPager withUpdatedLimit(DataLimits newLimits);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index acb55bb..59b2a51 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -19,9 +19,6 @@ package org.apache.cassandra.service.pager;
import java.nio.ByteBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.*;
@@ -33,8 +30,6 @@ import org.apache.cassandra.db.filter.*;
*/
public class SinglePartitionPager extends AbstractQueryPager
{
- private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class);
-
private final SinglePartitionReadCommand command;
private volatile PagingState.RowMark lastReturned;
@@ -51,6 +46,28 @@ public class SinglePartitionPager extends AbstractQueryPager
}
}
+ private SinglePartitionPager(SinglePartitionReadCommand command,
+ int protocolVersion,
+ PagingState.RowMark rowMark,
+ int remaining,
+ int remainingInPartition)
+ {
+ super(command, protocolVersion);
+ this.command = command;
+ this.lastReturned = rowMark;
+ restoreState(command.partitionKey(), remaining, remainingInPartition);
+ }
+
+ @Override
+ public SinglePartitionPager withUpdatedLimit(DataLimits newLimits)
+ {
+ return new SinglePartitionPager(command.withUpdatedLimit(newLimits),
+ protocolVersion,
+ lastReturned,
+ maxRemaining(),
+ remainingInPartition());
+ }
+
public ByteBuffer key()
{
return command.partitionKey().getKey();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 19e40d2..d4bc40b 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -717,7 +717,7 @@ public abstract class CQLTester
return formatQuery(KEYSPACE, query);
}
- protected String formatQuery(String keyspace, String query)
+ protected final String formatQuery(String keyspace, String query)
{
String currentTable = currentTable();
return currentTable == null ? query : String.format(query, keyspace + "." + currentTable);
@@ -771,6 +771,11 @@ public abstract class CQLTester
return rs;
}
+ protected void assertRowsNet(ResultSet result, Object[]... rows)
+ {
+ assertRowsNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1), result, rows);
+ }
+
protected void assertRowsNet(int protocolVersion, ResultSet result, Object[]... rows)
{
// necessary as we need cluster objects to supply CodecRegistry.