You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/11/19 19:55:03 UTC
[phoenix] branch 4.x updated: PHOENIX-6207 Paged server side
grouped aggregate operations
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new ce9450d PHOENIX-6207 Paged server side grouped aggregate operations
ce9450d is described below
commit ce9450d52b49f4388573d2ad131b6d851e53a8eb
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Wed Nov 18 16:12:32 2020 -0800
PHOENIX-6207 Paged server side grouped aggregate operations
---
.../java/org/apache/phoenix/end2end/GroupByIT.java | 2 -
.../GroupedAggregateRegionObserver.java | 357 ++++++++++++---------
.../phoenix/iterate/ScanningResultIterator.java | 4 +
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../apache/phoenix/query/QueryServicesOptions.java | 4 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 36 ++-
.../java/org/apache/phoenix/query/BaseTest.java | 9 +-
7 files changed, 262 insertions(+), 151 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java
index 0b45baa..83d34eb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java
@@ -333,6 +333,4 @@ public class GroupByIT extends BaseQueryIT {
conn.close();
}
}
-
-
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 3ea04a9..0c70998 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -22,8 +22,10 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -62,6 +64,7 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
@@ -71,6 +74,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
@@ -168,11 +172,22 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
if (limitBytes != null) {
limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
}
+ long pageSizeInMs = Long.MAX_VALUE;
+ if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
+ byte[] pageSizeFromScan =
+ scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
+ if (pageSizeFromScan != null) {
+ pageSizeInMs = Bytes.toLong(pageSizeFromScan);
+ } else {
+ pageSizeInMs = c.getEnvironment().getConfiguration().getLong(GROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
+ QueryServicesOptions.DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
+ }
+ }
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
- return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
+ return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
} else { // Otherwse, collect them all up in an in memory map
- return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
+ return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
}
}
}
@@ -356,6 +371,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
}
+
private static final class GroupByCacheFactory {
public static final GroupByCacheFactory INSTANCE = new GroupByCacheFactory();
@@ -373,198 +389,253 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals);
}
}
+
+ @Override
+ protected boolean isRegionObserverFor(Scan scan) {
+ return scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null ||
+ scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null;
+ }
+
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
* @param limit TODO
*/
- private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
- final RegionScanner scanner, final List<Expression> expressions,
- final ServerAggregators aggregators, long limit) throws IOException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(LogUtil.addCustomAnnotations(
- "Grouped aggregation over unordered rows with scan " + scan
- + ", group by " + expressions + ", aggregators " + aggregators,
- ScanUtil.getCustomAnnotations(scan)));
- }
- RegionCoprocessorEnvironment env = c.getEnvironment();
- Configuration conf = env.getConfiguration();
- int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
- byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
- if (estDistValsBytes != null) {
- // Allocate 1.5x estimation
- estDistVals = Math.max(MIN_DISTINCT_VALUES,
- (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
- }
-
- Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
- boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
- final boolean spillableEnabled =
- conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
- final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
-
- GroupByCache groupByCache =
- GroupByCacheFactory.INSTANCE.newCache(
- env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
- aggregators, estDistVals);
- boolean success = false;
- try {
- boolean hasMore;
- Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+
+ private static class UnorderedGroupByRegionScanner extends BaseRegionScanner {
+ private final Region region;
+ private final Pair<Integer, Integer> minMaxQualifiers;
+ private final boolean useQualifierAsIndex;
+ private final PTable.QualifierEncodingScheme encodingScheme;
+ private final ServerAggregators aggregators;
+ private final long limit;
+ private final List<Expression> expressions;
+ private final long pageSizeInMs;
+ private RegionScanner regionScanner = null;
+ private final boolean spillableEnabled;
+ private final GroupByCache groupByCache;
+
+ private UnorderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
+ final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+ super(scanner);
+ this.region = c.getEnvironment().getRegion();
+ minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+ useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+ encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ this.aggregators = aggregators;
+ this.limit = limit;
+ this.pageSizeInMs = pageSizeInMs;
+ this.expressions = expressions;
+ RegionCoprocessorEnvironment env = c.getEnvironment();
+ Configuration conf = env.getConfiguration();
+ int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
+ byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
+ if (estDistValsBytes != null) {
+ // Allocate 1.5x estimation
+ estDistVals = Math.max(MIN_DISTINCT_VALUES,
+ (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
+ }
+
+ spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
+ groupByCache = GroupByCacheFactory.INSTANCE.newCache(env, ScanUtil.getTenantId(scan),
+ ScanUtil.getCustomAnnotations(scan), aggregators, estDistVals);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
+ "Grouped aggregation over unordered rows with scan " + scan
+ + ", group by " + expressions + ", aggregators " + aggregators,
+ ScanUtil.getCustomAnnotations(scan)));
+ LOGGER.debug(LogUtil.addCustomAnnotations(
"Spillable groupby enabled: " + spillableEnabled,
ScanUtil.getCustomAnnotations(scan)));
}
- Region region = c.getEnvironment().getRegion();
+ }
+
+ @Override
+ public boolean next(List<Cell> resultsToReturn) throws IOException {
+ boolean hasMore;
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long now;
+ Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
boolean acquiredLock = false;
try {
region.startRegionOperation();
acquiredLock = true;
- synchronized (scanner) {
+ synchronized (delegate) {
+ if (regionScanner != null) {
+ return regionScanner.next(resultsToReturn);
+ }
do {
- List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
+ List<Cell> results = useQualifierAsIndex ?
+ new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(),
+ minMaxQualifiers.getSecond(), encodingScheme) :
+ new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
- hasMore = scanner.nextRaw(results);
+ hasMore = delegate.nextRaw(results);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesPtr key =
- TupleUtil.getConcatenatedValue(result, expressions);
+ TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
- } while (hasMore && groupByCache.size() < limit);
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeInMs);
+ if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeInMs) {
+ // Return a dummy result as we have processed a page worth of rows
+ // but we are not ready to aggregate
+ getDummyResult(resultsToReturn);
+ return true;
+ }
+ regionScanner = groupByCache.getScanner(delegate);
+ // Do not sort here, but sort back on the client instead
+ // The reason is that if the scan ever extends beyond a region
+ // (which can happen if we're basing our parallelization split
+ // points on old metadata), we'll get incorrect query results.
+ return regionScanner.next(resultsToReturn);
}
- } finally {
+ } finally {
if (acquiredLock) region.closeRegionOperation();
}
+ }
- RegionScanner regionScanner = groupByCache.getScanner(scanner);
-
- // Do not sort here, but sort back on the client instead
- // The reason is that if the scan ever extends beyond a region
- // (which can happen if we're basing our parallelization split
- // points on old metadata), we'll get incorrect query results.
- success = true;
- return regionScanner;
- } finally {
- if (!success) {
+ @Override
+ public void close() throws IOException {
+ if (regionScanner != null) {
+ regionScanner.close();
+ } else {
Closeables.closeQuietly(groupByCache);
}
}
}
-
+
/**
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
* @param limit TODO
* @throws IOException
*/
- private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
- final ServerAggregators aggregators, final long limit) throws IOException {
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(LogUtil.addCustomAnnotations(
- "Grouped aggregation over ordered rows with scan " + scan + ", group by "
- + expressions + ", aggregators " + aggregators,
- ScanUtil.getCustomAnnotations(scan)));
+
+ private static class OrderedGroupByRegionScanner extends BaseRegionScanner {
+ private final Scan scan;
+ private final Region region;
+ private final Pair<Integer, Integer> minMaxQualifiers;
+ private final boolean useQualifierAsIndex;
+ private final PTable.QualifierEncodingScheme encodingScheme;
+ private final ServerAggregators aggregators;
+ private final long limit;
+ private Aggregator[] rowAggregators;
+ private final List<Expression> expressions;
+ private final long pageSizeInMs;
+ private long rowCount = 0;
+ private ImmutableBytesPtr currentKey = null;
+
+ private OrderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
+ final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+ super(scanner);
+ this.scan = scan;
+ this.region = c.getEnvironment().getRegion();
+ minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+ useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+ encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ this.aggregators = aggregators;
+ rowAggregators = aggregators.getAggregators();
+ this.limit = limit;
+ this.pageSizeInMs = pageSizeInMs;
+ this.expressions = expressions;
}
- final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
- final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
- final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
- return new BaseRegionScanner(scanner) {
- private long rowCount = 0;
- private ImmutableBytesPtr currentKey = null;
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- boolean hasMore;
- boolean atLimit;
- boolean aggBoundary = false;
- Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
- ImmutableBytesPtr key = null;
- Aggregator[] rowAggregators = aggregators.getAggregators();
- // If we're calculating no aggregate functions, we can exit at the
- // start of a new row. Otherwise, we have to wait until an agg
- int countOffset = rowAggregators.length == 0 ? 1 : 0;
- Region region = c.getEnvironment().getRegion();
- boolean acquiredLock = false;
- try {
- region.startRegionOperation();
- acquiredLock = true;
- synchronized (scanner) {
- do {
- List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
- // Results are potentially returned even when the return
- // value of s.next is false
- // since this is an indication of whether or not there
- // are more values after the
- // ones returned
- hasMore = scanner.nextRaw(kvs);
- if (!kvs.isEmpty()) {
- result.setKeyValues(kvs);
- key = TupleUtil.getConcatenatedValue(result, expressions);
- aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
- if (!aggBoundary) {
- aggregators.aggregate(rowAggregators, result);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(LogUtil.addCustomAnnotations(
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ boolean hasMore;
+ boolean atLimit;
+ boolean aggBoundary = false;
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long now;
+ Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+ ImmutableBytesPtr key = null;
+
+ // If we're calculating no aggregate functions, we can exit at the
+ // start of a new row. Otherwise, we have to wait until an agg
+ int countOffset = rowAggregators.length == 0 ? 1 : 0;
+ boolean acquiredLock = false;
+ try {
+ region.startRegionOperation();
+ acquiredLock = true;
+ synchronized (delegate) {
+ do {
+ List<Cell> kvs = useQualifierAsIndex ?
+ new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(),
+ minMaxQualifiers.getSecond(), encodingScheme) :
+ new ArrayList<Cell>();
+ // Results are potentially returned even when the return
+ // value of s.next is false
+ // since this is an indication of whether or not there
+ // are more values after the
+ // ones returned
+ hasMore = delegate.nextRaw(kvs);
+ if (!kvs.isEmpty()) {
+ result.setKeyValues(kvs);
+ key = TupleUtil.getConcatenatedValue(result, expressions);
+ aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
+ if (!aggBoundary) {
+ aggregators.aggregate(rowAggregators, result);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(LogUtil.addCustomAnnotations(
"Row passed filters: " + kvs
- + ", aggregated values: "
- + Arrays.asList(rowAggregators),
+ + ", aggregated values: "
+ + Arrays.asList(rowAggregators),
ScanUtil.getCustomAnnotations(scan)));
- }
- currentKey = key;
}
+ currentKey = key;
}
- atLimit = rowCount + countOffset >= limit;
- // Do rowCount + 1 b/c we don't have to wait for a complete
- // row in the case of a DISTINCT with a LIMIT
- } while (hasMore && !aggBoundary && !atLimit);
- }
- } finally {
- if (acquiredLock) region.closeRegionOperation();
+ }
+ atLimit = rowCount + countOffset >= limit;
+ // Do rowCount + 1 b/c we don't have to wait for a complete
+ // row in the case of a DISTINCT with a LIMIT
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeInMs);
}
-
- if (currentKey != null) {
- byte[] value = aggregators.toBytes(rowAggregators);
- KeyValue keyValue =
- KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
+ } finally {
+ if (acquiredLock) region.closeRegionOperation();
+ }
+ if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeInMs) {
+ // Return a dummy result as we have processed a page worth of rows
+ // but we are not ready to aggregate
+ return true;
+ }
+ if (currentKey != null) {
+ byte[] value = aggregators.toBytes(rowAggregators);
+ KeyValue keyValue =
+ KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
- results.add(keyValue);
- // If we're at an aggregation boundary, reset the
- // aggregators and
- // aggregate with the current result (which is not a part of
- // the returned result).
- if (aggBoundary) {
- aggregators.reset(rowAggregators);
- aggregators.aggregate(rowAggregators, result);
- currentKey = key;
- rowCount++;
- atLimit |= rowCount >= limit;
- }
- }
- // Continue if there are more
- if (!atLimit && (hasMore || aggBoundary)) {
- return true;
+ results.add(keyValue);
+ // If we're at an aggregation boundary, reset the
+ // aggregators and
+ // aggregate with the current result (which is not a part of
+ // the returned result).
+ if (aggBoundary) {
+ aggregators.reset(rowAggregators);
+ aggregators.aggregate(rowAggregators, result);
+ currentKey = key;
+ rowCount++;
+ atLimit |= rowCount >= limit;
}
- currentKey = null;
- return false;
}
- };
- }
-
- @Override
- protected boolean isRegionObserverFor(Scan scan) {
- return scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null ||
- scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null;
+ // Continue if there are more
+ if (!atLimit && (hasMore || aggBoundary)) {
+ return true;
+ }
+ currentKey = null;
+ return false;
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 76db0d4..6c521c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -40,6 +40,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COU
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
import java.io.IOException;
import java.sql.SQLException;
@@ -157,6 +158,9 @@ public class ScanningResultIterator implements ResultIterator {
public Tuple next() throws SQLException {
try {
Result result = scanner.next();
+ while (result != null && isDummy(result)) {
+ result = scanner.next();
+ }
if (result == null) {
close(); // Free up resources early
return null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 748c423..52cb2b0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -332,6 +332,7 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
// The number of rows to be scanned in one RPC call
public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms";
+ public static final String GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.grouped.aggregate_page_size_in_ms";
// Flag indicating that server side masking of ttl expired rows is enabled.
public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index c36361d..289e3fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -341,9 +341,9 @@ public class QueryServicesOptions {
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
- public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1000;
+ public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second
-
+ public static final long DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000;
public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
public static final boolean DEFAULT_PROPERTY_POLICY_PROVIDER_ENABLED = true;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index fe8c06d..cd1877f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -20,9 +20,12 @@ package org.apache.phoenix.util;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
@@ -91,6 +94,7 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STAR
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
/**
*
@@ -807,7 +811,7 @@ public class ScanUtil {
public static byte[] getPrefix(byte[] startKey, int prefixLength) {
// If startKey is at beginning, then our prefix will be a null padded byte array
- return startKey.length >= prefixLength ? startKey : ByteUtil.EMPTY_BYTE_ARRAY;
+ return startKey.length >= prefixLength ? startKey : EMPTY_BYTE_ARRAY;
}
private static boolean hasNonZeroLeadingBytes(byte[] key, int nBytesToCheck) {
@@ -1205,4 +1209,34 @@ public class ScanUtil {
addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
}
}
+
+ public static void getDummyResult(byte[] rowKey, List<Cell> result) {
+ KeyValue keyValue =
+ KeyValueUtil.newKeyValue(rowKey, 0,
+ rowKey.length, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY,
+ 0, EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length);
+ result.add(keyValue);
+ }
+
+ public static void getDummyResult(List<Cell> result) {
+ getDummyResult(EMPTY_BYTE_ARRAY, result);
+ }
+
+ public static boolean isDummy(Result result) {
+ // Check if the result is a dummy result
+ if (result.rawCells().length != 1) {
+ return false;
+ }
+ Cell cell = result.rawCells()[0];
+ return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ }
+
+ public static boolean isDummy(List<Cell> result) {
+ // Check if the result is a dummy result
+ if (result.size() != 1) {
+ return false;
+ }
+ Cell cell = result.get(0);
+ return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ }
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 5320b66..e3e7a9f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -626,11 +626,14 @@ public abstract class BaseTest {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+ // This results in processing one row at a time in each next operation of the aggregate region
+ // scanner, i.e., one row pages. In other words, 0ms page allows only one row to be processed
+ // within one page; 0ms page is equivalent to one-row page
if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
- // This results in processing one row at a time in each next operation of the aggregate region
- // scanner, i.e., one row pages. In other words, 0ms page allows only one row to be processed
- // within one page; 0ms page is equivalent to one-row page
+ }
+ if (conf.getLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
+ conf.setLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
}
return conf;
}