You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/11/20 22:06:53 UTC

[phoenix] branch master updated: PHOENIX-6207 Paged server side grouped aggregate operations

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d0e86e  PHOENIX-6207 Paged server side grouped aggregate operations
0d0e86e is described below

commit 0d0e86e7ba63d20e92d4e8c03259344a958dbcd1
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Wed Nov 18 18:10:05 2020 -0800

    PHOENIX-6207 Paged server side grouped aggregate operations
---
 .../GroupedAggregateRegionObserver.java            | 347 +++++++++++++--------
 .../phoenix/iterate/ScanningResultIterator.java    |   4 +
 .../org/apache/phoenix/query/QueryServices.java    |   1 +
 .../apache/phoenix/query/QueryServicesOptions.java |   3 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  35 ++-
 .../java/org/apache/phoenix/query/BaseTest.java    |   9 +-
 6 files changed, 257 insertions(+), 142 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index aecf99b..9766c5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -22,8 +22,10 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -64,6 +66,7 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
@@ -73,6 +76,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
@@ -176,11 +180,22 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
             if (limitBytes != null) {
                 limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
             }
+            long pageSizeInMs = Long.MAX_VALUE;
+            if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
+                byte[] pageSizeFromScan =
+                        scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
+                if (pageSizeFromScan != null) {
+                    pageSizeInMs = Bytes.toLong(pageSizeFromScan);
+                } else {
+                    pageSizeInMs = c.getEnvironment().getConfiguration().getLong(GROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
+                                    QueryServicesOptions.DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
+                }
+            }
             if (keyOrdered) { // Optimize by taking advantage that the rows are
                               // already in the required group by key order
-                return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
+                return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
             } else { // Otherwse, collect them all up in an in memory map
-                return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
+                return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
             }
         }
     }
@@ -364,6 +379,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
         }
 
     }
+
     private static final class GroupByCacheFactory {
         public static final GroupByCacheFactory INSTANCE = new GroupByCacheFactory();
 
@@ -381,193 +397,250 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
             return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals);
         }
     }
+
     /**
      * Used for an aggregate query in which the key order does not necessarily match the group by
      * key order. In this case, we must collect all distinct groups within a region into a map,
      * aggregating as we go.
      * @param limit TODO
      */
-    private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
-            final RegionScanner scanner, final List<Expression> expressions,
-            final ServerAggregators aggregators, long limit) throws IOException {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(LogUtil.addCustomAnnotations(
-                    "Grouped aggregation over unordered rows with scan " + scan
-                    + ", group by " + expressions + ", aggregators " + aggregators,
-                    ScanUtil.getCustomAnnotations(scan)));
-        }
-        RegionCoprocessorEnvironment env = c.getEnvironment();
-        Configuration conf = env.getConfiguration();
-        int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
-        byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
-        if (estDistValsBytes != null) {
-            // Allocate 1.5x estimation
-            estDistVals = Math.max(MIN_DISTINCT_VALUES,
-                            (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
-        }
-        
-        Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-        boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
-        final boolean spillableEnabled =
-                conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
-        final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
-
-        GroupByCache groupByCache =
-                GroupByCacheFactory.INSTANCE.newCache(
-                        env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
-                        aggregators, estDistVals);
-        boolean success = false;
-        try {
-            boolean hasMore;
-            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+    private static class UnorderedGroupByRegionScanner extends BaseRegionScanner {
+        private final Region region;
+        private final Pair<Integer, Integer> minMaxQualifiers;
+        private final boolean useQualifierAsIndex;
+        private final PTable.QualifierEncodingScheme encodingScheme;
+        private final ServerAggregators aggregators;
+        private final long limit;
+        private final List<Expression> expressions;
+        private final long pageSizeInMs;
+        private RegionScanner regionScanner = null;
+        private final GroupByCache groupByCache;
+
+        private UnorderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+                                              final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
+                                              final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+            super(scanner);
+            this.region = c.getEnvironment().getRegion();
+            this.aggregators = aggregators;
+            this.limit = limit;
+            this.pageSizeInMs = pageSizeInMs;
+            this.expressions = expressions;
+            RegionCoprocessorEnvironment env = c.getEnvironment();
+            Configuration conf = env.getConfiguration();
+            int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
+            byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
+            if (estDistValsBytes != null) {
+                // Allocate 1.5x estimation
+                estDistVals = Math.max(MIN_DISTINCT_VALUES,
+                        (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
+            }
+
+            minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+            useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+            final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
+            encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+
+            groupByCache = GroupByCacheFactory.INSTANCE.newCache(
+                    env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan), aggregators, estDistVals);
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(LogUtil.addCustomAnnotations(
+                        "Grouped aggregation over unordered rows with scan " + scan
+                                + ", group by " + expressions + ", aggregators " + aggregators,
+                        ScanUtil.getCustomAnnotations(scan)));
+                LOGGER.debug(LogUtil.addCustomAnnotations(
                         "Spillable groupby enabled: " + spillableEnabled,
                         ScanUtil.getCustomAnnotations(scan)));
             }
-            Region region = c.getEnvironment().getRegion();
+        }
+
+        @Override
+        public boolean next(List<Cell> resultsToReturn) throws IOException {
+            boolean hasMore;
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            long now;
+            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
             boolean acquiredLock = false;
             try {
                 region.startRegionOperation();
                 acquiredLock = true;
-                synchronized (scanner) {
+                synchronized (delegate) {
+                    if (regionScanner != null) {
+                        return regionScanner.next(resultsToReturn);
+                    }
                     do {
-                        List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
+                        List<Cell> results = useQualifierAsIndex ?
+                                new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(),
+                                        minMaxQualifiers.getSecond(), encodingScheme) :
+                                new ArrayList<Cell>();
                         // Results are potentially returned even when the return
                         // value of s.next is false
                         // since this is an indication of whether or not there are
                         // more values after the
                         // ones returned
-                        hasMore = scanner.nextRaw(results);
+                        hasMore = delegate.nextRaw(results);
                         if (!results.isEmpty()) {
                             result.setKeyValues(results);
                             ImmutableBytesPtr key =
-                                TupleUtil.getConcatenatedValue(result, expressions);
+                                    TupleUtil.getConcatenatedValue(result, expressions);
                             Aggregator[] rowAggregators = groupByCache.cache(key);
                             // Aggregate values here
                             aggregators.aggregate(rowAggregators, result);
                         }
-                    } while (hasMore && groupByCache.size() < limit);
+                        now = EnvironmentEdgeManager.currentTimeMillis();
+                    } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeInMs);
+                    if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeInMs) {
+                        // Return a dummy result as we have processed a page worth of rows
+                        // but we are not ready to aggregate
+                        getDummyResult(resultsToReturn);
+                        return true;
+                    }
+                    regionScanner = groupByCache.getScanner(delegate);
+                    // Do not sort here, but sort back on the client instead
+                    // The reason is that if the scan ever extends beyond a region
+                    // (which can happen if we're basing our parallelization split
+                    // points on old metadata), we'll get incorrect query results.
+                    return regionScanner.next(resultsToReturn);
                 }
-            }  finally {
+            } finally {
                 if (acquiredLock) region.closeRegionOperation();
             }
+        }
 
-            RegionScanner regionScanner = groupByCache.getScanner(scanner);
-
-            // Do not sort here, but sort back on the client instead
-            // The reason is that if the scan ever extends beyond a region
-            // (which can happen if we're basing our parallelization split
-            // points on old metadata), we'll get incorrect query results.
-            success = true;
-            return regionScanner;
-        } finally {
-            if (!success) {
+        @Override
+        public void close() throws IOException {
+            if (regionScanner != null) {
+                regionScanner.close();
+            } else {
                 Closeables.closeQuietly(groupByCache);
             }
         }
     }
-    
+
     /**
      * Used for an aggregate query in which the key order match the group by key order. In this
      * case, we can do the aggregation as we scan, by detecting when the group by key changes.
      * @param limit TODO
      * @throws IOException
      */
-    private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
-            final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
-            final ServerAggregators aggregators, final long limit) throws IOException {
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(LogUtil.addCustomAnnotations(
-                    "Grouped aggregation over ordered rows with scan " + scan + ", group by "
-                    + expressions + ", aggregators " + aggregators,
-                    ScanUtil.getCustomAnnotations(scan)));
+    private static class OrderedGroupByRegionScanner extends BaseRegionScanner {
+        private final Scan scan;
+        private final Region region;
+        private final Pair<Integer, Integer> minMaxQualifiers;
+        private final boolean useQualifierAsIndex;
+        private final PTable.QualifierEncodingScheme encodingScheme;
+        private final ServerAggregators aggregators;
+        private final long limit;
+        private final List<Expression> expressions;
+        private final long pageSizeInMs;
+        private long rowCount = 0;
+        private ImmutableBytesPtr currentKey = null;
+
+        private OrderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+                                            final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
+                                            final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+            super(scanner);
+            this.scan = scan;
+            this.aggregators = aggregators;
+            this.limit = limit;
+            this.pageSizeInMs = pageSizeInMs;
+            this.expressions = expressions;
+            region = c.getEnvironment().getRegion();
+            minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+            useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+            encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(LogUtil.addCustomAnnotations(
+                        "Grouped aggregation over ordered rows with scan " + scan + ", group by "
+                                + expressions + ", aggregators " + aggregators,
+                        ScanUtil.getCustomAnnotations(scan)));
+            }
         }
-        final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-        final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
-        final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
-        return new BaseRegionScanner(scanner) {
-            private long rowCount = 0;
-            private ImmutableBytesPtr currentKey = null;
-
-            @Override
-            public boolean next(List<Cell> results) throws IOException {
-                boolean hasMore;
-                boolean atLimit;
-                boolean aggBoundary = false;
-                Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
-                ImmutableBytesPtr key = null;
-                Aggregator[] rowAggregators = aggregators.getAggregators();
-                // If we're calculating no aggregate functions, we can exit at the
-                // start of a new row. Otherwise, we have to wait until an agg
-                int countOffset = rowAggregators.length == 0 ? 1 : 0;
-                Region region = c.getEnvironment().getRegion();
-                boolean acquiredLock = false;
-                try {
-                    region.startRegionOperation();
-                    acquiredLock = true;
-                    synchronized (scanner) {
-                        do {
-                            List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
-                            // Results are potentially returned even when the return
-                            // value of s.next is false
-                            // since this is an indication of whether or not there
-                            // are more values after the
-                            // ones returned
-                            hasMore = scanner.nextRaw(kvs);
-                            if (!kvs.isEmpty()) {
-                                result.setKeyValues(kvs);
-                                key = TupleUtil.getConcatenatedValue(result, expressions);
-                                aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
-                                if (!aggBoundary) {
-                                    aggregators.aggregate(rowAggregators, result);
-                                    if (LOGGER.isDebugEnabled()) {
-                                        LOGGER.debug(LogUtil.addCustomAnnotations(
+        @Override
+        public boolean next(List<Cell> results) throws IOException {
+            boolean hasMore;
+            boolean atLimit;
+            boolean aggBoundary = false;
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            long now;
+            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+            ImmutableBytesPtr key = null;
+            Aggregator[] rowAggregators = aggregators.getAggregators();
+            // If we're calculating no aggregate functions, we can exit at the
+            // start of a new row. Otherwise, we have to wait until an agg
+            int countOffset = rowAggregators.length == 0 ? 1 : 0;
+            boolean acquiredLock = false;
+            try {
+                region.startRegionOperation();
+                acquiredLock = true;
+                synchronized (delegate) {
+                    do {
+                        List<Cell> kvs = useQualifierAsIndex ?
+                                new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(),
+                                        minMaxQualifiers.getSecond(), encodingScheme) :
+                                new ArrayList<Cell>();
+                        // Results are potentially returned even when the return
+                        // value of s.next is false
+                        // since this is an indication of whether or not there
+                        // are more values after the
+                        // ones returned
+                        hasMore = delegate.nextRaw(kvs);
+                        if (!kvs.isEmpty()) {
+                            result.setKeyValues(kvs);
+                            key = TupleUtil.getConcatenatedValue(result, expressions);
+                            aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
+                            if (!aggBoundary) {
+                                aggregators.aggregate(rowAggregators, result);
+                                if (LOGGER.isDebugEnabled()) {
+                                    LOGGER.debug(LogUtil.addCustomAnnotations(
                                             "Row passed filters: " + kvs
-                                            + ", aggregated values: "
-                                            + Arrays.asList(rowAggregators),
+                                                    + ", aggregated values: "
+                                                    + Arrays.asList(rowAggregators),
                                             ScanUtil.getCustomAnnotations(scan)));
-                                    }
-                                    currentKey = key;
                                 }
+                                currentKey = key;
                             }
-                            atLimit = rowCount + countOffset >= limit;
-                            // Do rowCount + 1 b/c we don't have to wait for a complete
-                            // row in the case of a DISTINCT with a LIMIT
-                        } while (hasMore && !aggBoundary && !atLimit);
-                    }
-                } finally {
-                    if (acquiredLock) region.closeRegionOperation();
+                        }
+                        atLimit = rowCount + countOffset >= limit;
+                        // Do rowCount + 1 b/c we don't have to wait for a complete
+                        // row in the case of a DISTINCT with a LIMIT
+                        now = EnvironmentEdgeManager.currentTimeMillis();
+                    } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeInMs);
                 }
-
-                if (currentKey != null) {
-                    byte[] value = aggregators.toBytes(rowAggregators);
-                    Cell keyValue =
-                            PhoenixKeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
+            } finally {
+                if (acquiredLock) region.closeRegionOperation();
+            }
+            if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeInMs) {
+                // Return a dummy result as we have processed a page worth of rows
+                // but we are not ready to aggregate
+                getDummyResult(results);
+                return true;
+            }
+            if (currentKey != null) {
+                byte[] value = aggregators.toBytes(rowAggregators);
+                Cell keyValue =
+                        PhoenixKeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
                                 currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
                                 AGG_TIMESTAMP, value, 0, value.length);
-                    results.add(keyValue);
-                    // If we're at an aggregation boundary, reset the
-                    // aggregators and
-                    // aggregate with the current result (which is not a part of
-                    // the returned result).
-                    if (aggBoundary) {
-                        aggregators.reset(rowAggregators);
-                        aggregators.aggregate(rowAggregators, result);
-                        currentKey = key;
-                        rowCount++;
-                        atLimit |= rowCount >= limit;
-                    }
+                results.add(keyValue);
+                // If we're at an aggregation boundary, reset the
+                // aggregators and
+                // aggregate with the current result (which is not a part of
+                // the returned result).
+                if (aggBoundary) {
+                    aggregators.reset(rowAggregators);
+                    aggregators.aggregate(rowAggregators, result);
+                    currentKey = key;
+                    rowCount++;
+                    atLimit |= rowCount >= limit;
                 }
-                // Continue if there are more
-                if (!atLimit && (hasMore || aggBoundary)) {
-                    return true;
-                }
-                currentKey = null;
-                return false;
             }
-        };
+            // Continue if there are more
+            if (!atLimit && (hasMore || aggBoundary)) {
+                return true;
+            }
+            currentKey = null;
+            return false;
+        }
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index d86a27a..2c9aee4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -40,6 +40,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COU
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -153,6 +154,9 @@ public class ScanningResultIterator implements ResultIterator {
     public Tuple next() throws SQLException {
         try {
             Result result = scanner.next();
+            while (result != null && isDummy(result)) {
+                result = scanner.next();
+            }
             if (result == null) {
                 close(); // Free up resources early
                 return null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 89bb540..6c2f5ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -329,6 +329,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
     // The number of rows to be scanned in one RPC call
     public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms";
+    public static final String GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.grouped.aggregate_page_size_in_ms";
     // Flag indicating that server side masking of ttl expired rows is enabled.
     public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3b46cbb..a1c89f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -337,8 +337,9 @@ public class QueryServicesOptions {
 
     public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
-    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1000;
+    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
     public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second
+    public static final long DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000;
 
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index e25bdd4..a0a2cc2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STAR
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -38,9 +39,11 @@ import java.util.NavigableSet;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -809,7 +812,7 @@ public class ScanUtil {
 
     public static byte[] getPrefix(byte[] startKey, int prefixLength) {
         // If startKey is at beginning, then our prefix will be a null padded byte array
-        return startKey.length >= prefixLength ? startKey : ByteUtil.EMPTY_BYTE_ARRAY;
+        return startKey.length >= prefixLength ? startKey : EMPTY_BYTE_ARRAY;
     }
 
     private static boolean hasNonZeroLeadingBytes(byte[] key, int nBytesToCheck) {
@@ -1207,4 +1210,34 @@ public class ScanUtil {
             addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
         }
     }
+
+    public static void getDummyResult(byte[] rowKey, List<Cell> result) {
+        Cell keyValue =
+                PhoenixKeyValueUtil.newKeyValue(rowKey, 0,
+                        rowKey.length, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY,
+                        0, EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length);
+        result.add(keyValue);
+    }
+
+    public static void getDummyResult(List<Cell> result) {
+        getDummyResult(EMPTY_BYTE_ARRAY, result);
+    }
+
+    public static boolean isDummy(Result result) {
+        // Check if the result is a dummy result
+        if (result.rawCells().length != 1) {
+            return false;
+        }
+        Cell cell = result.rawCells()[0];
+        return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    }
+
+    public static boolean isDummy(List<Cell> result) {
+        // Check if the result is a dummy result
+        if (result.size() != 1) {
+            return false;
+        }
+        Cell cell = result.get(0);
+        return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index ccc72ec..74fb75b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -627,11 +627,14 @@ public abstract class BaseTest {
         conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
         conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+        // This results in processing one row at a time in each next operation of the aggregate region
+        // scanner, i.e.,  one row pages. In other words, 0ms page allows only one row to be processed
+        // within one page; 0ms page is equivalent to one-row page
         if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
             conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
-            // This results in processing one row at a time in each next operation of the aggregate region
-            // scanner, i.e.,  one row pages. In other words, 0ms page allows only one row to be processed
-            // within one page; 0ms page is equivalent to one-row page
+        }
+        if (conf.getLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
+            conf.setLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
         }
         return conf;
     }