You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/08/04 15:13:51 UTC

[3/4] cassandra git commit: Add support for GROUP BY to SELECT statement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 85cae0c..abe029b 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.aggregation.GroupMaker;
+import org.apache.cassandra.db.aggregation.GroupingState;
+import org.apache.cassandra.db.aggregation.AggregationSpecification;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.transform.BasePartitions;
@@ -68,7 +71,7 @@ public abstract class DataLimits
     // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
     public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true);
 
-    public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
+    public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT, CQL_GROUP_BY_LIMIT, CQL_GROUP_BY_PAGING_LIMIT }
 
     public static DataLimits cqlLimits(int cqlRowLimit)
     {
@@ -89,6 +92,14 @@ public abstract class DataLimits
              : new CQLLimits(cqlRowLimit, perPartitionLimit, isDistinct);
     }
 
+    public static DataLimits groupByLimits(int groupLimit,
+                                           int groupPerPartitionLimit,
+                                           int rowLimit,
+                                           AggregationSpecification groupBySpec)
+    {
+        return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
+    }
+
     public static DataLimits distinctLimits(int cqlRowLimit)
     {
         return CQLLimits.distinct(cqlRowLimit);
@@ -109,11 +120,32 @@ public abstract class DataLimits
     public abstract boolean isUnlimited();
     public abstract boolean isDistinct();
 
+    public boolean isGroupByLimit()
+    {
+        return false;
+    }
+
+    public boolean isExhausted(Counter counter)
+    {
+        return counter.counted() < count();
+    }
+
     public abstract DataLimits forPaging(int pageSize);
     public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining);
 
     public abstract DataLimits forShortReadRetry(int toFetch);
 
+    /**
+     * Creates a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries.
+     *
+     * @param state the <code>GroupMaker</code> state
+     * @return a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries
+     */
+    public DataLimits forGroupByInternalPaging(GroupingState state)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec);
 
     /**
@@ -139,6 +171,12 @@ public abstract class DataLimits
 
     public abstract int perPartitionCount();
 
+    /**
+     * Returns equivalent limits but where any internal state kept to track where we are of paging and/or grouping is
+     * discarded.
+     */
+    public abstract DataLimits withoutState();
+
     public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
     {
         return this.newCounter(nowInSec, false).applyTo(iter);
@@ -162,9 +200,18 @@ public abstract class DataLimits
 
     public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>>
     {
+        protected final int nowInSec;
+        protected final boolean assumeLiveData;
+
         // false means we do not propagate our stop signals onto the iterator, we only count
         private boolean enforceLimits = true;
 
+        protected Counter(int nowInSec, boolean assumeLiveData)
+        {
+            this.nowInSec = nowInSec;
+            this.assumeLiveData = assumeLiveData;
+        }
+
         public Counter onlyCount()
         {
             this.enforceLimits = false;
@@ -199,11 +246,31 @@ public abstract class DataLimits
          * @return the number of results counted.
          */
         public abstract int counted();
+
         public abstract int countedInCurrentPartition();
 
+        /**
+         * The number of rows counted.
+         *
+         * @return the number of rows counted.
+         */
+        public abstract int rowCounted();
+
+        /**
+         * The number of rows counted in the current partition.
+         *
+         * @return the number of rows counted in the current partition.
+         */
+        public abstract int rowCountedInCurrentPartition();
+
         public abstract boolean isDone();
         public abstract boolean isDoneForPartition();
 
+        protected boolean isLive(Row row)
+        {
+            return assumeLiveData || row.hasLiveData(nowInSec);
+        }
+
         @Override
         protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
         {
@@ -232,6 +299,12 @@ public abstract class DataLimits
             if (isDoneForPartition())
                 stopInPartition();
         }
+
+        @Override
+        public void onClose()
+        {
+            super.onClose();
+        }
     }
 
     /**
@@ -341,6 +414,11 @@ public abstract class DataLimits
             return perPartitionLimit;
         }
 
+        public DataLimits withoutState()
+        {
+            return this;
+        }
+
         public float estimateTotalResults(ColumnFamilyStore cfs)
         {
             // TODO: we should start storing stats on the number of rows (instead of the number of cells, which
@@ -351,9 +429,6 @@ public abstract class DataLimits
 
         protected class CQLCounter extends Counter
         {
-            protected final int nowInSec;
-            protected final boolean assumeLiveData;
-
             protected int rowCounted;
             protected int rowInCurrentPartition;
 
@@ -361,21 +436,20 @@ public abstract class DataLimits
 
             public CQLCounter(int nowInSec, boolean assumeLiveData)
             {
-                this.nowInSec = nowInSec;
-                this.assumeLiveData = assumeLiveData;
+                super(nowInSec, assumeLiveData);
             }
 
             @Override
             public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
             {
                 rowInCurrentPartition = 0;
-                hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec));
+                hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
             }
 
             @Override
             public Row applyToRow(Row row)
             {
-                if (assumeLiveData || row.hasLiveData(nowInSec))
+                if (isLive(row))
                     incrementRowCount();
                 return row;
             }
@@ -391,7 +465,7 @@ public abstract class DataLimits
                 super.onPartitionClose();
             }
 
-            private void incrementRowCount()
+            protected void incrementRowCount()
             {
                 if (++rowCounted >= rowLimit)
                     stop();
@@ -409,6 +483,16 @@ public abstract class DataLimits
                 return rowInCurrentPartition;
             }
 
+            public int rowCounted()
+            {
+                return rowCounted;
+            }
+
+            public int rowCountedInCurrentPartition()
+            {
+                return rowInCurrentPartition;
+            }
+
             public boolean isDone()
             {
                 return rowCounted >= rowLimit;
@@ -470,6 +554,12 @@ public abstract class DataLimits
         }
 
         @Override
+        public DataLimits withoutState()
+        {
+            return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
+        }
+
+        @Override
         public Counter newCounter(int nowInSec, boolean assumeLiveData)
         {
             return new PagingAwareCounter(nowInSec, assumeLiveData);
@@ -503,6 +593,499 @@ public abstract class DataLimits
     }
 
     /**
+     * <code>CQLLimits</code> used for GROUP BY queries or queries with aggregates.
+     * <p>Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence,
+     * the limits keep track of the number of rows as well as the number of groups.</p>
+     * <p>A group can only be counted if the next group or the end of the data is reached.</p>
+     */
+    private static class CQLGroupByLimits extends CQLLimits
+    {
+        /**
+         * The <code>GroupMaker</code> state
+         */
+        protected final GroupingState state;
+
+        /**
+         * The GROUP BY specification
+         */
+        protected final AggregationSpecification groupBySpec;
+
+        /**
+         * The limit on the number of groups
+         */
+        protected final int groupLimit;
+
+        /**
+         * The limit on the number of groups per partition
+         */
+        protected final int groupPerPartitionLimit;
+
+        public CQLGroupByLimits(int groupLimit,
+                                int groupPerPartitionLimit,
+                                int rowLimit,
+                                AggregationSpecification groupBySpec)
+        {
+            this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, GroupingState.EMPTY_STATE);
+        }
+
+        private CQLGroupByLimits(int groupLimit,
+                                 int groupPerPartitionLimit,
+                                 int rowLimit,
+                                 AggregationSpecification groupBySpec,
+                                 GroupingState state)
+        {
+            super(rowLimit, NO_LIMIT, false);
+            this.groupLimit = groupLimit;
+            this.groupPerPartitionLimit = groupPerPartitionLimit;
+            this.groupBySpec = groupBySpec;
+            this.state = state;
+        }
+
+        @Override
+        public Kind kind()
+        {
+            return Kind.CQL_GROUP_BY_LIMIT;
+        }
+
+        @Override
+        public boolean isGroupByLimit()
+        {
+            return true;
+        }
+
+        public boolean isUnlimited()
+        {
+            return groupLimit == NO_LIMIT && groupPerPartitionLimit == NO_LIMIT && rowLimit == NO_LIMIT;
+        }
+
+        public DataLimits forShortReadRetry(int toFetch)
+        {
+            return new CQLLimits(toFetch);
+        }
+
+        @Override
+        public float estimateTotalResults(ColumnFamilyStore cfs)
+        {
+            // For the moment, we return the estimated number of rows as we have no good way of estimating 
+            // the number of groups that will be returned. Hopefully, we should be able to fix
+            // that problem at some point.
+            return super.estimateTotalResults(cfs);
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize)
+        {
+            return new CQLGroupByLimits(pageSize,
+                                        groupPerPartitionLimit,
+                                        rowLimit,
+                                        groupBySpec,
+                                        state);
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
+        {
+            return new CQLGroupByPagingLimits(pageSize,
+                                              groupPerPartitionLimit,
+                                              rowLimit,
+                                              groupBySpec,
+                                              state,
+                                              lastReturnedKey,
+                                              lastReturnedKeyRemaining);
+        }
+
+        @Override
+        public DataLimits forGroupByInternalPaging(GroupingState state)
+        {
+            return new CQLGroupByLimits(rowLimit,
+                                        groupPerPartitionLimit,
+                                        rowLimit,
+                                        groupBySpec,
+                                        state);
+        }
+
+        @Override
+        public Counter newCounter(int nowInSec, boolean assumeLiveData)
+        {
+            return new GroupByAwareCounter(nowInSec, assumeLiveData);
+        }
+
+        @Override
+        public int count()
+        {
+            return groupLimit;
+        }
+
+        @Override
+        public int perPartitionCount()
+        {
+            return groupPerPartitionLimit;
+        }
+
+        @Override
+        public DataLimits withoutState()
+        {
+            return state == GroupingState.EMPTY_STATE
+                 ? this
+                 : new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+
+            if (groupLimit != NO_LIMIT)
+            {
+                sb.append("GROUP LIMIT ").append(groupLimit);
+                if (groupPerPartitionLimit != NO_LIMIT || rowLimit != NO_LIMIT)
+                    sb.append(' ');
+            }
+
+            if (groupPerPartitionLimit != NO_LIMIT)
+            {
+                sb.append("GROUP PER PARTITION LIMIT ").append(groupPerPartitionLimit);
+                if (rowLimit != NO_LIMIT)
+                    sb.append(' ');
+            }
+
+            if (rowLimit != NO_LIMIT)
+            {
+                sb.append("LIMIT ").append(rowLimit);
+            }
+
+            return sb.toString();
+        }
+
+        @Override
+        public boolean isExhausted(Counter counter)
+        {
+            return ((GroupByAwareCounter) counter).rowCounted < rowLimit
+                    && counter.counted() < groupLimit;
+        }
+
+        protected class GroupByAwareCounter extends Counter
+        {
+            private final GroupMaker groupMaker;
+
+            /**
+             * The key of the partition being processed.
+             */
+            protected DecoratedKey currentPartitionKey;
+
+            /**
+             * The number of rows counted so far.
+             */
+            protected int rowCounted;
+
+            /**
+             * The number of rows counted so far in the current partition.
+             */
+            protected int rowCountedInCurrentPartition;
+
+            /**
+             * The number of groups counted so far. A group is counted only once it is complete
+             * (e.g the next one has been reached).
+             */
+            protected int groupCounted;
+
+            /**
+             * The number of groups in the current partition.
+             */
+            protected int groupInCurrentPartition;
+
+            protected boolean hasGroupStarted;
+
+            protected boolean hasLiveStaticRow;
+
+            protected boolean hasReturnedRowsFromCurrentPartition;
+
+            private GroupByAwareCounter(int nowInSec, boolean assumeLiveData)
+            {
+                super(nowInSec, assumeLiveData);
+                this.groupMaker = groupBySpec.newGroupMaker(state);
+
+                // If the end of the partition was reached at the same time than the row limit, the last group might
+                // not have been counted yet. Due to that we need to guess, based on the state, if the previous group
+                // is still open.
+                hasGroupStarted = state.hasClustering();
+            }
+
+            @Override
+            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
+            {
+                if (partitionKey.getKey().equals(state.partitionKey()))
+                {
+                    // The only case were we could have state.partitionKey() equals to the partition key
+                    // is if some of the partition rows have been returned in the previous page but the
+                    // partition was not exhausted (as the state partition key has not been updated yet).
+                    // Since we know we have returned rows, we know we have accounted for
+                    // the static row if any already, so force hasLiveStaticRow to false so we make sure to not count it
+                    // once more.
+                    hasLiveStaticRow = false;
+                    hasReturnedRowsFromCurrentPartition = true;
+                    hasGroupStarted = true;
+                }
+                else
+                {
+                    // We need to increment our count of groups if we have reached a new one and unless we had no new
+                    // content added since we closed our last group (that is, if hasGroupStarted). Note that we may get
+                    // here with hasGroupStarted == false in the following cases:
+                    // * the partition limit was reached for the previous partition
+                    // * the previous partition was containing only one static row
+                    // * the rows of the last group of the previous partition were all marked as deleted
+                    if (hasGroupStarted && groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING))
+                    {
+                        incrementGroupCount();
+                        // If we detect, before starting the new partition, that we are done, we need to increase
+                        // the per partition group count of the previous partition as the next page will start from
+                        // there.
+                        if (isDone())
+                            incrementGroupInCurrentPartitionCount();
+                        hasGroupStarted = false;
+                    }
+                    hasReturnedRowsFromCurrentPartition = false;
+                    hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
+                }
+                currentPartitionKey = partitionKey;
+                // If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition
+                // because the pager need to retrieve the count associated to the last value it has returned.
+                if (!isDone())
+                {
+                    groupInCurrentPartition = 0;
+                    rowCountedInCurrentPartition = 0;
+                }
+            }
+
+            @Override
+            protected Row applyToStatic(Row row)
+            {
+                // It's possible that we're "done" if the partition we just started bumped the number of groups (in
+                // applyToPartition() above), in which case Transformation will still call this method. In that case, we
+                // want to ignore the static row, it should (and will) be returned with the next page/group if needs be.
+                if (isDone())
+                {
+                    hasLiveStaticRow = false; // The row has not been returned
+                    return Rows.EMPTY_STATIC_ROW;
+                }
+                return row;
+            }
+
+            @Override
+            public Row applyToRow(Row row)
+            {
+                // We want to check if the row belongs to a new group even if it has been deleted. The goal being
+                // to minimize the chances of having to go through the same data twice if we detect on the next
+                // non deleted row that we have reached the limit.
+                if (groupMaker.isNewGroup(currentPartitionKey, row.clustering()))
+                {
+                    if (hasGroupStarted)
+                    {
+                        incrementGroupCount();
+                        incrementGroupInCurrentPartitionCount();
+                    }
+                    hasGroupStarted = false;
+                }
+
+                // That row may have made us increment the group count, which may mean we're done for this partition, in
+                // which case we shouldn't count this row (it won't be returned).
+                if (isDoneForPartition())
+                {
+                    hasGroupStarted = false;
+                    return null;
+                }
+
+                if (isLive(row))
+                {
+                    hasGroupStarted = true;
+                    incrementRowCount();
+                    hasReturnedRowsFromCurrentPartition = true;
+                }
+
+                return row;
+            }
+
+            @Override
+            public int counted()
+            {
+                return groupCounted;
+            }
+
+            @Override
+            public int countedInCurrentPartition()
+            {
+                return groupInCurrentPartition;
+            }
+
+            @Override
+            public int rowCounted()
+            {
+                return rowCounted;
+            }
+
+            @Override
+            public int rowCountedInCurrentPartition()
+            {
+                return rowCountedInCurrentPartition;
+            }
+
+            protected void incrementRowCount()
+            {
+                rowCountedInCurrentPartition++;
+                if (++rowCounted >= rowLimit)
+                    stop();
+            }
+
+            private void incrementGroupCount()
+            {
+                groupCounted++;
+                if (groupCounted >= groupLimit)
+                    stop();
+            }
+
+            private void incrementGroupInCurrentPartitionCount()
+            {
+                groupInCurrentPartition++;
+                if (groupInCurrentPartition >= groupPerPartitionLimit)
+                    stopInPartition();
+            }
+
+            @Override
+            public boolean isDoneForPartition()
+            {
+                return isDone() || groupInCurrentPartition >= groupPerPartitionLimit;
+            }
+
+            @Override
+            public boolean isDone()
+            {
+                return groupCounted >= groupLimit;
+            }
+
+            @Override
+            public void onPartitionClose()
+            {
+                // Normally, we don't count static rows as from a CQL point of view, it will be merge with other
+                // rows in the partition. However, if we only have the static row, it will be returned as one group
+                // so count it.
+                if (hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition)
+                {
+                    incrementRowCount();
+                    incrementGroupCount();
+                    incrementGroupInCurrentPartitionCount();
+                    hasGroupStarted = false;
+                }
+                super.onPartitionClose();
+            }
+
+            @Override
+            public void onClose()
+            {
+                // Groups are only counted when the end of the group is reached.
+                // The end of a group is detected by 2 ways:
+                // 1) a new group is reached
+                // 2) the end of the data is reached
+                // We know that the end of the data is reached if the group limit has not been reached
+                // and the number of rows counted is smaller than the internal page size.
+                if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit)
+                {
+                    incrementGroupCount();
+                    incrementGroupInCurrentPartitionCount();
+                }
+
+                super.onClose();
+            }
+        }
+    }
+
+    private static class CQLGroupByPagingLimits extends CQLGroupByLimits
+    {
+        private final ByteBuffer lastReturnedKey;
+
+        private final int lastReturnedKeyRemaining;
+
+        public CQLGroupByPagingLimits(int groupLimit,
+                                      int groupPerPartitionLimit,
+                                      int rowLimit,
+                                      AggregationSpecification groupBySpec,
+                                      GroupingState state,
+                                      ByteBuffer lastReturnedKey,
+                                      int lastReturnedKeyRemaining)
+        {
+            super(groupLimit,
+                  groupPerPartitionLimit,
+                  rowLimit,
+                  groupBySpec,
+                  state);
+
+            this.lastReturnedKey = lastReturnedKey;
+            this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
+        }
+
+        @Override
+        public Kind kind()
+        {
+            return Kind.CQL_GROUP_BY_PAGING_LIMIT;
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public DataLimits forGroupByInternalPaging(GroupingState state)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Counter newCounter(int nowInSec, boolean assumeLiveData)
+        {
+            assert state == GroupingState.EMPTY_STATE || lastReturnedKey.equals(state.partitionKey());
+            return new PagingGroupByAwareCounter(nowInSec, assumeLiveData);
+        }
+
+        @Override
+        public DataLimits withoutState()
+        {
+            return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
+        }
+
+        private class PagingGroupByAwareCounter extends GroupByAwareCounter
+        {
+            private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData)
+            {
+                super(nowInSec, assumeLiveData);
+            }
+
+            @Override
+            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
+            {
+                if (partitionKey.getKey().equals(lastReturnedKey))
+                {
+                    currentPartitionKey = partitionKey;
+                    groupInCurrentPartition = groupPerPartitionLimit - lastReturnedKeyRemaining;
+                    hasReturnedRowsFromCurrentPartition = true;
+                    hasLiveStaticRow = false;
+                    hasGroupStarted = state.hasClustering();
+                }
+                else
+                {
+                    super.applyToPartition(partitionKey, staticRow);
+                }
+            }
+        }
+    }
+
+    /**
      * Limits used by thrift; this count partition and cells.
      */
     private static class ThriftLimits extends DataLimits
@@ -593,6 +1176,11 @@ public abstract class DataLimits
             return cellPerPartitionLimit;
         }
 
+        public DataLimits withoutState()
+        {
+            return this;
+        }
+
         public float estimateTotalResults(ColumnFamilyStore cfs)
         {
             // remember that getMeansColumns returns a number of cells: we should clean nomenclature
@@ -602,17 +1190,13 @@ public abstract class DataLimits
 
         protected class ThriftCounter extends Counter
         {
-            protected final int nowInSec;
-            protected final boolean assumeLiveData;
-
             protected int partitionsCounted;
             protected int cellsCounted;
             protected int cellsInCurrentPartition;
 
             public ThriftCounter(int nowInSec, boolean assumeLiveData)
             {
-                this.nowInSec = nowInSec;
-                this.assumeLiveData = assumeLiveData;
+                super(nowInSec, assumeLiveData);
             }
 
             @Override
@@ -656,6 +1240,16 @@ public abstract class DataLimits
                 return cellsInCurrentPartition;
             }
 
+            public int rowCounted()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public int rowCountedInCurrentPartition()
+            {
+                throw new UnsupportedOperationException();
+            }
+
             public boolean isDone()
             {
                 return partitionsCounted >= partitionLimit;
@@ -723,7 +1317,7 @@ public abstract class DataLimits
             public Row applyToRow(Row row)
             {
                 // In the internal format, a row == a super column, so that's what we want to count.
-                if (assumeLiveData || row.hasLiveData(nowInSec))
+                if (isLive(row))
                 {
                     ++cellsCounted;
                     if (++cellsInCurrentPartition >= cellPerPartitionLimit)
@@ -736,7 +1330,7 @@ public abstract class DataLimits
 
     public static class Serializer
     {
-        public void serialize(DataLimits limits, DataOutputPlus out, int version) throws IOException
+        public void serialize(DataLimits limits, DataOutputPlus out, int version, ClusteringComparator comparator) throws IOException
         {
             out.writeByte(limits.kind().ordinal());
             switch (limits.kind())
@@ -754,6 +1348,25 @@ public abstract class DataLimits
                         out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
                     }
                     break;
+                case CQL_GROUP_BY_LIMIT:
+                case CQL_GROUP_BY_PAGING_LIMIT:
+                    CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits;
+                    out.writeUnsignedVInt(groupByLimits.groupLimit);
+                    out.writeUnsignedVInt(groupByLimits.groupPerPartitionLimit);
+                    out.writeUnsignedVInt(groupByLimits.rowLimit);
+
+                    AggregationSpecification groupBySpec = groupByLimits.groupBySpec;
+                    AggregationSpecification.serializer.serialize(groupBySpec, out, version);
+
+                    GroupingState.serializer.serialize(groupByLimits.state, out, version, comparator);
+
+                    if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT)
+                    {
+                        CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits;
+                        ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out);
+                        out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
+                     }
+                     break;
                 case THRIFT_LIMIT:
                 case SUPER_COLUMN_COUNTING_LIMIT:
                     ThriftLimits thriftLimits = (ThriftLimits)limits;
@@ -763,54 +1376,102 @@ public abstract class DataLimits
             }
         }
 
-        public DataLimits deserialize(DataInputPlus in, int version) throws IOException
+        public DataLimits deserialize(DataInputPlus in, int version, ClusteringComparator comparator) throws IOException
         {
             Kind kind = Kind.values()[in.readUnsignedByte()];
             switch (kind)
             {
                 case CQL_LIMIT:
                 case CQL_PAGING_LIMIT:
-                    int rowLimit = (int)in.readUnsignedVInt();
-                    int perPartitionLimit = (int)in.readUnsignedVInt();
+                {
+                    int rowLimit = (int) in.readUnsignedVInt();
+                    int perPartitionLimit = (int) in.readUnsignedVInt();
                     boolean isDistinct = in.readBoolean();
                     if (kind == Kind.CQL_LIMIT)
                         return cqlLimits(rowLimit, perPartitionLimit, isDistinct);
-
                     ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
-                    int lastRemaining = (int)in.readUnsignedVInt();
+                    int lastRemaining = (int) in.readUnsignedVInt();
                     return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining);
+                }
+                case CQL_GROUP_BY_LIMIT:
+                case CQL_GROUP_BY_PAGING_LIMIT:
+                {
+                    int groupLimit = (int) in.readUnsignedVInt();
+                    int groupPerPartitionLimit = (int) in.readUnsignedVInt();
+                    int rowLimit = (int) in.readUnsignedVInt();
+
+                    AggregationSpecification groupBySpec = AggregationSpecification.serializer.deserialize(in, version, comparator);
+
+                    GroupingState state = GroupingState.serializer.deserialize(in, version, comparator);
+
+                    if (kind == Kind.CQL_GROUP_BY_LIMIT)
+                        return new CQLGroupByLimits(groupLimit,
+                                                    groupPerPartitionLimit,
+                                                    rowLimit,
+                                                    groupBySpec,
+                                                    state);
+
+                    ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
+                    int lastRemaining = (int) in.readUnsignedVInt();
+                    return new CQLGroupByPagingLimits(groupLimit,
+                                                      groupPerPartitionLimit,
+                                                      rowLimit,
+                                                      groupBySpec,
+                                                      state,
+                                                      lastKey,
+                                                      lastRemaining);
+                }
                 case THRIFT_LIMIT:
                 case SUPER_COLUMN_COUNTING_LIMIT:
-                    int partitionLimit = (int)in.readUnsignedVInt();
-                    int cellPerPartitionLimit = (int)in.readUnsignedVInt();
+                    int partitionLimit = (int) in.readUnsignedVInt();
+                    int cellPerPartitionLimit = (int) in.readUnsignedVInt();
                     return kind == Kind.THRIFT_LIMIT
-                         ? new ThriftLimits(partitionLimit, cellPerPartitionLimit)
-                         : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
+                            ? new ThriftLimits(partitionLimit, cellPerPartitionLimit)
+                            : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
             }
             throw new AssertionError();
         }
 
-        public long serializedSize(DataLimits limits, int version)
+        public long serializedSize(DataLimits limits, int version, ClusteringComparator comparator)
         {
-            long size = TypeSizes.sizeof((byte)limits.kind().ordinal());
+            long size = TypeSizes.sizeof((byte) limits.kind().ordinal());
             switch (limits.kind())
             {
                 case CQL_LIMIT:
                 case CQL_PAGING_LIMIT:
-                    CQLLimits cqlLimits = (CQLLimits)limits;
+                    CQLLimits cqlLimits = (CQLLimits) limits;
                     size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit);
                     size += TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit);
                     size += TypeSizes.sizeof(cqlLimits.isDistinct);
                     if (limits.kind() == Kind.CQL_PAGING_LIMIT)
                     {
-                        CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits;
+                        CQLPagingLimits pagingLimits = (CQLPagingLimits) cqlLimits;
+                        size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
+                        size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
+                    }
+                    break;
+                case CQL_GROUP_BY_LIMIT:
+                case CQL_GROUP_BY_PAGING_LIMIT:
+                    CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits;
+                    size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupLimit);
+                    size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupPerPartitionLimit);
+                    size += TypeSizes.sizeofUnsignedVInt(groupByLimits.rowLimit);
+
+                    AggregationSpecification groupBySpec = groupByLimits.groupBySpec;
+                    size += AggregationSpecification.serializer.serializedSize(groupBySpec, version);
+
+                    size += GroupingState.serializer.serializedSize(groupByLimits.state, version, comparator);
+
+                    if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT)
+                    {
+                        CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits;
                         size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
                         size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
                     }
                     break;
                 case THRIFT_LIMIT:
                 case SUPER_COLUMN_COUNTING_LIMIT:
-                    ThriftLimits thriftLimits = (ThriftLimits)limits;
+                    ThriftLimits thriftLimits = (ThriftLimits) limits;
                     size += TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit);
                     size += TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit);
                     break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 2c1b347..b9ae933 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.DataLimits.Counter;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.MoreRows;
@@ -436,9 +437,9 @@ public class DataResolver extends ResponseResolver
                 // Also note that we only get here once all the results for this node have been returned, and so
                 // if the node had returned the requested number but we still get there, it imply some results were
                 // skipped during reconciliation.
-                if (lastCount == counter.counted() || !counter.isDoneForPartition())
+                if (lastCount == counted(counter) || !counter.isDoneForPartition())
                     return null;
-                lastCount = counter.counted();
+                lastCount = counted(counter);
 
                 assert !postReconciliationCounter.isDoneForPartition();
 
@@ -450,8 +451,8 @@ public class DataResolver extends ResponseResolver
                 // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
                 // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
                 // counting iterator.
-                int n = postReconciliationCounter.countedInCurrentPartition();
-                int x = counter.countedInCurrentPartition();
+                int n = countedInCurrentPartition(postReconciliationCounter);
+                int x = countedInCurrentPartition(counter);
                 int toQuery = Math.max(((n * n) / x) - n, 1);
 
                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
@@ -468,6 +469,38 @@ public class DataResolver extends ResponseResolver
                 return doShortReadRetry(cmd);
             }
 
+            /**
+             * Returns the number of results counted by the counter.
+             *
+             * @param counter the counter.
+             * @return the number of results counted by the counter
+             */
+            private int counted(Counter counter)
+            {
+                // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of
+                // groups.
+                if (command.limits().isGroupByLimit())
+                    return counter.rowCounted();
+
+                return counter.counted();
+            }
+
+            /**
+             * Returns the number of results counted in the partition by the counter.
+             *
+             * @param counter the counter.
+             * @return the number of results counted in the partition by the counter
+             */
+            private int countedInCurrentPartition(Counter counter)
+            {
+                // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns
+                // the number of groups in the current partition.
+                if (command.limits().isGroupByLimit())
+                    return counter.rowCountedInCurrentPartition();
+
+                return counter.countedInCurrentPartition();
+            }
+
             private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
             {
                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2839259..bc8c46c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2080,9 +2080,18 @@ public class StorageProxy implements StorageProxyMBean
                          rowsPerRange, (int) remainingRows, concurrencyFactor);
         }
 
-        private SingleRangeResponse query(RangeForQuery toQuery)
+        /**
+         * Queries the provided sub-range.
+         *
+         * @param toQuery the subRange to query.
+         * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
+         * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
+         * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
+         * that it's the query that "continues" whatever we're previously queried).
+         */
+        private SingleRangeResponse query(RangeForQuery toQuery, boolean isFirst)
         {
-            PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
+            PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst);
 
             DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
 
@@ -2115,7 +2124,7 @@ public class StorageProxy implements StorageProxyMBean
             List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
             for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
             {
-                concurrentQueries.add(query(ranges.next()));
+                concurrentQueries.add(query(ranges.next(), i == 0));
                 ++rangesQueried;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 06252ef..01a56c4 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -22,8 +22,6 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
 
 abstract class AbstractQueryPager implements QueryPager
@@ -57,23 +55,25 @@ abstract class AbstractQueryPager implements QueryPager
         return command.executionController();
     }
 
-    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState)
     {
         if (isExhausted())
             return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
         Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+
         return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
     }
 
-    public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
+    public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
     {
         if (isExhausted())
             return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
         Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+
         return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager);
     }
 
@@ -81,6 +81,7 @@ abstract class AbstractQueryPager implements QueryPager
     {
         private final DataLimits pageLimits;
         private final DataLimits.Counter counter;
+        private DecoratedKey currentKey;
         private Row lastRow;
         private boolean isFirstPartition = true;
 
@@ -93,10 +94,7 @@ abstract class AbstractQueryPager implements QueryPager
         @Override
         public RowIterator applyToPartition(RowIterator partition)
         {
-            DecoratedKey key = partition.partitionKey();
-            if (lastKey == null || !lastKey.equals(key))
-                remainingInPartition = limits.perPartitionCount();
-            lastKey = key;
+            currentKey = partition.partitionKey();
 
             // If this is the first partition of this page, this could be the continuation of a partition we've started
             // on the previous page. In which case, we could have the problem that the partition has no more "regular"
@@ -106,7 +104,7 @@ abstract class AbstractQueryPager implements QueryPager
             if (isFirstPartition)
             {
                 isFirstPartition = false;
-                if (isPreviouslyReturnedPartition(key) && !partition.hasNext())
+                if (isPreviouslyReturnedPartition(currentKey) && !partition.hasNext())
                 {
                     partition.close();
                     return null;
@@ -119,10 +117,12 @@ abstract class AbstractQueryPager implements QueryPager
         @Override
         public void onClose()
         {
+            // In some case like GROUP BY a counter need to know when the processing is completed.
+            counter.onClose();
+
             recordLast(lastKey, lastRow);
 
-            int counted = counter.counted();
-            remaining -= counted;
+            remaining -= counter.counted();
             // If the clustering of the last row returned is a static one, it means that the partition was only
             // containing data within the static columns. If the clustering of the last row returned is empty
             // it means that there is only one row per partition. Therefore, in both cases there are no data remaining
@@ -136,19 +136,28 @@ abstract class AbstractQueryPager implements QueryPager
             {
                 remainingInPartition -= counter.countedInCurrentPartition();
             }
-            exhausted = counted < pageLimits.count();
+            exhausted = pageLimits.isExhausted(counter);
         }
 
         public Row applyToStatic(Row row)
         {
             if (!row.isEmpty())
+            {
+                remainingInPartition = limits.perPartitionCount();
+                lastKey = currentKey;
                 lastRow = row;
+            }
             return row;
         }
 
         @Override
         public Row applyToRow(Row row)
         {
+            if (!currentKey.equals(lastKey))
+            {
+                remainingInPartition = limits.perPartitionCount();
+                lastKey = currentKey;
+            }
             lastRow = row;
             return row;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
new file mode 100644
index 0000000..1bdaac6
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.aggregation.GroupingState;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * {@code QueryPager} that takes care of fetching the pages for aggregation queries.
+ * <p>
+ * For aggregation/group by queries, the user page size is in number of groups. But each group could be composed of very
+ * many rows so to avoid running into OOMs, this pager will page internal queries into sub-pages. So each call to
+ * {@link fetchPage} may (transparently) yield multiple internal queries (sub-pages).
+ */
+public final class AggregationQueryPager implements QueryPager
+{
+    private final DataLimits limits;
+
+    // The sub-pager, used to retrieve the next sub-page.
+    private QueryPager subPager;
+
+    public AggregationQueryPager(QueryPager subPager, DataLimits limits)
+    {
+        this.subPager = subPager;
+        this.limits = limits;
+    }
+
+    @Override
+    public PartitionIterator fetchPage(int pageSize,
+                                       ConsistencyLevel consistency,
+                                       ClientState clientState)
+    {
+        if (limits.isGroupByLimit())
+            return new GroupByPartitionIterator(pageSize, consistency, clientState);
+
+        return new AggregationPartitionIterator(pageSize, consistency, clientState);
+    }
+
+    @Override
+    public ReadExecutionController executionController()
+    {
+        return subPager.executionController();
+    }
+
+    @Override
+    public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
+    {
+        if (limits.isGroupByLimit())
+            return new GroupByPartitionIterator(pageSize, executionController);
+
+        return new AggregationPartitionIterator(pageSize, executionController);
+    }
+
+    @Override
+    public boolean isExhausted()
+    {
+        return subPager.isExhausted();
+    }
+
+    @Override
+    public int maxRemaining()
+    {
+        return subPager.maxRemaining();
+    }
+
+    @Override
+    public PagingState state()
+    {
+        return subPager.state();
+    }
+
+    @Override
+    public QueryPager withUpdatedLimit(DataLimits newLimits)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * <code>PartitionIterator</code> that automatically fetch a new sub-page of data if needed when the current iterator is
+     * exhausted.
+     */
+    public class GroupByPartitionIterator implements PartitionIterator
+    {
+        /**
+         * The top-level page size in number of groups.
+         */
+        private final int pageSize;
+
+        // For "normal" queries
+        private final ConsistencyLevel consistency;
+        private final ClientState clientState;
+
+        // For internal queries
+        private final ReadExecutionController executionController;
+
+        /**
+         * The <code>PartitionIterator</code> over the last page retrieved.
+         */
+        private PartitionIterator partitionIterator;
+
+        /**
+         * The next <code>RowIterator</code> to be returned.
+         */
+        private RowIterator next;
+
+        /**
+         * Specify if all the data have been returned.
+         */
+        private boolean endOfData;
+
+        /**
+         * Keeps track if the partitionIterator has been closed or not.
+         */
+        private boolean closed;
+
+        /**
+         * The key of the last partition processed.
+         */
+        private ByteBuffer lastPartitionKey;
+
+        /**
+         * The clustering of the last row processed
+         */
+        private Clustering lastClustering;
+
+        /**
+         * The initial amount of row remaining
+         */
+        private int initialMaxRemaining;
+
+        public GroupByPartitionIterator(int pageSize,
+                                         ConsistencyLevel consistency,
+                                         ClientState clientState)
+        {
+            this(pageSize, consistency, clientState, null);
+        }
+
+        public GroupByPartitionIterator(int pageSize,
+                                        ReadExecutionController executionController)
+       {
+           this(pageSize, null, null, executionController);
+       }
+
+        private GroupByPartitionIterator(int pageSize,
+                                         ConsistencyLevel consistency,
+                                         ClientState clientState,
+                                         ReadExecutionController executionController)
+        {
+            this.pageSize = handlePagingOff(pageSize);
+            this.consistency = consistency;
+            this.clientState = clientState;
+            this.executionController = executionController;
+        }
+
+        private int handlePagingOff(int pageSize)
+        {
+            // If the paging is off, the pageSize will be <= 0. So we need to replace
+            // it by DataLimits.NO_LIMIT
+            return pageSize <= 0 ? DataLimits.NO_LIMIT : pageSize;
+        }
+
+        public final void close()
+        {
+            if (!closed)
+            {
+                closed = true;
+                partitionIterator.close();
+            }
+        }
+
+        public final boolean hasNext()
+        {
+            if (endOfData)
+                return false;
+
+            if (next != null)
+                return true;
+
+            fetchNextRowIterator();
+
+            return next != null;
+        }
+
+        /**
+         * Loads the next <code>RowIterator</code> to be returned.
+         */
+        private void fetchNextRowIterator()
+        {
+            if (partitionIterator == null)
+            {
+                initialMaxRemaining = subPager.maxRemaining();
+                partitionIterator = fetchSubPage(pageSize);
+            }
+
+            while (!partitionIterator.hasNext())
+            {
+                partitionIterator.close();
+
+                int counted = initialMaxRemaining - subPager.maxRemaining();
+
+                if (isDone(pageSize, counted) || subPager.isExhausted())
+                {
+                    endOfData = true;
+                    closed = true;
+                    return;
+                }
+
+                subPager = updatePagerLimit(subPager, limits, lastPartitionKey, lastClustering);
+                partitionIterator = fetchSubPage(computeSubPageSize(pageSize, counted));
+            }
+
+            next = partitionIterator.next();
+        }
+
+        protected boolean isDone(int pageSize, int counted)
+        {
+            return counted == pageSize;
+        }
+
+        /**
+         * Updates the pager with the new limits if needed.
+         *
+         * @param pager the pager previoulsy used
+         * @param limits the DataLimits
+         * @param lastPartitionKey the partition key of the last row returned
+         * @param lastClustering the clustering of the last row returned
+         * @return the pager to use to query the next page of data
+         */
+        protected QueryPager updatePagerLimit(QueryPager pager,
+                                              DataLimits limits,
+                                              ByteBuffer lastPartitionKey,
+                                              Clustering lastClustering)
+        {
+            GroupingState state = new GroupingState(lastPartitionKey, lastClustering);
+            DataLimits newLimits = limits.forGroupByInternalPaging(state);
+            return pager.withUpdatedLimit(newLimits);
+        }
+
+        /**
+         * Computes the size of the next sub-page to retrieve.
+         *
+         * @param pageSize the top-level page size
+         * @param counted the number of result returned so far by the previous sub-pages
+         * @return the size of the next sub-page to retrieve
+         */
+        protected int computeSubPageSize(int pageSize, int counted)
+        {
+            return pageSize - counted;
+        }
+
+        /**
+         * Fetchs the next sub-page.
+         *
+         * @param subPageSize the sub-page size in number of groups
+         * @return the next sub-page
+         */
+        private final PartitionIterator fetchSubPage(int subPageSize)
+        {
+            return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState)
+                                       : subPager.fetchPageInternal(subPageSize, executionController);
+        }
+
+        public final RowIterator next()
+        {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            RowIterator iterator = new GroupByRowIterator(next);
+            lastPartitionKey = iterator.partitionKey().getKey();
+            next = null;
+            return iterator;
+        }
+
+        private class GroupByRowIterator implements RowIterator
+        {
+            /**
+             * The decorated <code>RowIterator</code>.
+             */
+            private RowIterator rowIterator;
+
+            /**
+             * Keeps track if the decorated iterator has been closed or not.
+             */
+            private boolean closed;
+
+            public GroupByRowIterator(RowIterator delegate)
+            {
+                this.rowIterator = delegate;
+            }
+
+            public CFMetaData metadata()
+            {
+                return rowIterator.metadata();
+            }
+
+            public boolean isReverseOrder()
+            {
+                return rowIterator.isReverseOrder();
+            }
+
+            public PartitionColumns columns()
+            {
+                return rowIterator.columns();
+            }
+
+            public DecoratedKey partitionKey()
+            {
+                return rowIterator.partitionKey();
+            }
+
+            public Row staticRow()
+            {
+                Row row = rowIterator.staticRow();
+                lastClustering = null;
+                return row;
+            }
+
+            public boolean isEmpty()
+            {
+                return this.rowIterator.isEmpty() && !hasNext();
+            }
+
+            public void close()
+            {
+                if (!closed)
+                    rowIterator.close();
+            }
+
+            public boolean hasNext()
+            {
+                if (rowIterator.hasNext())
+                    return true;
+
+                DecoratedKey partitionKey = rowIterator.partitionKey();
+
+                rowIterator.close();
+
+                // Fetch the next RowIterator
+                GroupByPartitionIterator.this.hasNext();
+
+                // if the previous page was ending within the partition the
+                // next RowIterator is the continuation of this one
+                if (next != null && partitionKey.equals(next.partitionKey()))
+                {
+                    rowIterator = next;
+                    next = null;
+                    return rowIterator.hasNext();
+                }
+
+                closed = true;
+                return false;
+            }
+
+            public Row next()
+            {
+                Row row = this.rowIterator.next();
+                lastClustering = row.clustering();
+                return row;
+            }
+        }
+    }
+
+    /**
+     * <code>PartitionIterator</code> for queries without Group By but with aggregates.
+     * <p>For maintaining backward compatibility we are forced to use the {@link DataLimits.CQLLimits} instead of the
+     * {@link DataLimits.CQLGroupByLimits}. Due to that pages need to be fetched in a different way.</p>
+     */
+    public final class AggregationPartitionIterator extends GroupByPartitionIterator
+    {
+        public AggregationPartitionIterator(int pageSize,
+                                            ConsistencyLevel consistency,
+                                            ClientState clientState)
+        {
+            super(pageSize, consistency, clientState);
+        }
+
+        public AggregationPartitionIterator(int pageSize,
+                                            ReadExecutionController executionController)
+        {
+            super(pageSize, executionController);
+        }
+
+        @Override
+        protected QueryPager updatePagerLimit(QueryPager pager,
+                                              DataLimits limits,
+                                              ByteBuffer lastPartitionKey,
+                                              Clustering lastClustering)
+        {
+            return pager;
+        }
+
+        @Override
+        protected boolean isDone(int pageSize, int counted)
+        {
+            return false;
+        }
+
+        @Override
+        protected int computeSubPageSize(int pageSize, int counted)
+        {
+            return pageSize;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 57d6c62..9670f28 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.service.pager;
 
 import org.apache.cassandra.utils.AbstractIterator;
 
+import java.util.Arrays;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.DataLimits;
@@ -81,6 +83,27 @@ public class MultiPartitionPager implements QueryPager
         remaining = state == null ? limit.count() : state.remaining;
     }
 
+    private MultiPartitionPager(SinglePartitionPager[] pagers, DataLimits limit, int nowInSec, int remaining, int current)
+    {
+        this.pagers = pagers;
+        this.limit = limit;
+        this.nowInSec = nowInSec;
+        this.remaining = remaining;
+        this.current = current;
+    }
+
+    public QueryPager withUpdatedLimit(DataLimits newLimits)
+    {
+        SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, pagers.length);
+        newPagers[current] = newPagers[current].withUpdatedLimit(newLimits);
+
+        return new MultiPartitionPager(newPagers,
+                                       newLimits,
+                                       nowInSec,
+                                       remaining,
+                                       current);
+    }
+
     public PagingState state()
     {
         // Sets current to the first non-exhausted pager
@@ -122,27 +145,21 @@ public class MultiPartitionPager implements QueryPager
     public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
     {
         int toQuery = Math.min(remaining, pageSize);
-        PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null);
-        DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
-        iter.setCounter(counter);
-        return counter.applyTo(iter);
+        return new PagersIterator(toQuery, consistency, clientState, null);
     }
 
     @SuppressWarnings("resource") // iter closed via countingIter
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
     {
         int toQuery = Math.min(remaining, pageSize);
-        PagersIterator iter = new PagersIterator(toQuery, null, null, executionController);
-        DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
-        iter.setCounter(counter);
-        return counter.applyTo(iter);
+        return new PagersIterator(toQuery, null, null, executionController);
     }
 
     private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
     {
         private final int pageSize;
         private PartitionIterator result;
-        private DataLimits.Counter counter;
+        private boolean closed;
 
         // For "normal" queries
         private final ConsistencyLevel consistency;
@@ -151,6 +168,9 @@ public class MultiPartitionPager implements QueryPager
         // For internal queries
         private final ReadExecutionController executionController;
 
+        private int pagerMaxRemaining;
+        private int counted;
+
         public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController)
         {
             this.pageSize = pageSize;
@@ -159,23 +179,30 @@ public class MultiPartitionPager implements QueryPager
             this.executionController = executionController;
         }
 
-        public void setCounter(DataLimits.Counter counter)
-        {
-            this.counter = counter;
-        }
-
         protected RowIterator computeNext()
         {
             while (result == null || !result.hasNext())
             {
                 if (result != null)
+                {
                     result.close();
-
-                // This sets us on the first non-exhausted pager
-                if (isExhausted())
+                    counted += pagerMaxRemaining - pagers[current].maxRemaining();
+                }
+
+                // We are done if we have reached the page size or in the case of GROUP BY if the current pager
+                // is not exhausted.
+                boolean isDone = counted >= pageSize
+                        || (result != null && limit.isGroupByLimit() && !pagers[current].isExhausted());
+
+                // isExhausted() will sets us on the first non-exhausted pager
+                if (isDone || isExhausted())
+                {
+                    closed = true;
                     return endOfData();
+                }
 
-                int toQuery = pageSize - counter.counted();
+                pagerMaxRemaining = pagers[current].maxRemaining();
+                int toQuery = pageSize - counted;
                 result = consistency == null
                        ? pagers[current].fetchPageInternal(toQuery, executionController)
                        : pagers[current].fetchPage(toQuery, consistency, clientState);
@@ -185,8 +212,8 @@ public class MultiPartitionPager implements QueryPager
 
         public void close()
         {
-            remaining -= counter.counted();
-            if (result != null)
+            remaining -= counted;
+            if (result != null && !closed)
                 result.close();
         }
     }
@@ -195,4 +222,4 @@ public class MultiPartitionPager implements QueryPager
     {
         return remaining;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 9c216e3..5a7cccf 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -19,9 +19,6 @@ package org.apache.cassandra.service.pager;
 
 import java.util.Optional;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.rows.Row;
@@ -38,8 +35,6 @@ import org.apache.cassandra.schema.IndexMetadata;
  */
 public class PartitionRangeQueryPager extends AbstractQueryPager
 {
-    private static final Logger logger = LoggerFactory.getLogger(PartitionRangeQueryPager.class);
-
     private volatile DecoratedKey lastReturnedKey;
     private volatile PagingState.RowMark lastReturnedRow;
 
@@ -55,6 +50,29 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
         }
     }
 
+    public PartitionRangeQueryPager(ReadCommand command,
+                                    int protocolVersion,
+                                    DecoratedKey lastReturnedKey,
+                                    PagingState.RowMark lastReturnedRow,
+                                    int remaining,
+                                    int remainingInPartition)
+    {
+        super(command, protocolVersion);
+        this.lastReturnedKey = lastReturnedKey;
+        this.lastReturnedRow = lastReturnedRow;
+        restoreState(lastReturnedKey, remaining, remainingInPartition);
+    }
+
+    public PartitionRangeQueryPager withUpdatedLimit(DataLimits newLimits)
+    {
+        return new PartitionRangeQueryPager(command.withUpdatedLimit(newLimits),
+                                            protocolVersion,
+                                            lastReturnedKey,
+                                            lastReturnedRow,
+                                            maxRemaining(),
+                                            remainingInPartition());
+    }
+
     public PagingState state()
     {
         return lastReturnedKey == null

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index e2d7f5e..edd2a55 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service.pager;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.EmptyIterators;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.RequestExecutionException;
@@ -77,6 +78,11 @@ public interface QueryPager
         {
             return null;
         }
+
+        public QueryPager withUpdatedLimit(DataLimits newLimits)
+        {
+            throw new UnsupportedOperationException();
+        }
     };
 
     /**
@@ -134,4 +140,12 @@ public interface QueryPager
      * beginning. If the pager is exhausted, the result is undefined.
      */
     public PagingState state();
+
+    /**
+     * Creates a new <code>QueryPager</code> that use the new limits.
+     *
+     * @param newLimits the new limits
+     * @return a new <code>QueryPager</code> that use the new limits
+     */
+    public QueryPager withUpdatedLimit(DataLimits newLimits);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index acb55bb..59b2a51 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -19,9 +19,6 @@ package org.apache.cassandra.service.pager;
 
 import java.nio.ByteBuffer;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
@@ -33,8 +30,6 @@ import org.apache.cassandra.db.filter.*;
  */
 public class SinglePartitionPager extends AbstractQueryPager
 {
-    private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class);
-
     private final SinglePartitionReadCommand command;
 
     private volatile PagingState.RowMark lastReturned;
@@ -51,6 +46,28 @@ public class SinglePartitionPager extends AbstractQueryPager
         }
     }
 
+    private SinglePartitionPager(SinglePartitionReadCommand command,
+                                 int protocolVersion,
+                                 PagingState.RowMark rowMark,
+                                 int remaining,
+                                 int remainingInPartition)
+    {
+        super(command, protocolVersion);
+        this.command = command;
+        this.lastReturned = rowMark;
+        restoreState(command.partitionKey(), remaining, remainingInPartition);
+    }
+
+    @Override
+    public SinglePartitionPager withUpdatedLimit(DataLimits newLimits)
+    {
+        return new SinglePartitionPager(command.withUpdatedLimit(newLimits),
+                                        protocolVersion,
+                                        lastReturned,
+                                        maxRemaining(),
+                                        remainingInPartition());
+    }
+
     public ByteBuffer key()
     {
         return command.partitionKey().getKey();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 19e40d2..d4bc40b 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -717,7 +717,7 @@ public abstract class CQLTester
         return formatQuery(KEYSPACE, query);
     }
 
-    protected String formatQuery(String keyspace, String query)
+    protected final String formatQuery(String keyspace, String query)
     {
         String currentTable = currentTable();
         return currentTable == null ? query : String.format(query, keyspace + "." + currentTable);
@@ -771,6 +771,11 @@ public abstract class CQLTester
         return rs;
     }
 
+    protected void assertRowsNet(ResultSet result, Object[]... rows)
+    {
+        assertRowsNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1), result, rows);
+    }
+
     protected void assertRowsNet(int protocolVersion, ResultSet result, Object[]... rows)
     {
         // necessary as we need cluster objects to supply CodecRegistry.