You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/18 21:01:36 UTC
[pinot] branch master updated: Prevent over allocating for group-by reduce (#8921)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b791cd10fe Prevent over allocating for group-by reduce (#8921)
b791cd10fe is described below
commit b791cd10febd94b990ade4b21bfe6ffc72350491
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sat Jun 18 14:01:30 2022 -0700
Prevent over allocating for group-by reduce (#8921)
- Allocate array based on both the LIMIT and number of records returned
- Short-circuit the calculation when there is no record returned, or LIMIT is 0
---
.../core/query/reduce/GroupByDataTableReducer.java | 32 ++++++++++++++++------
1 file changed, 24 insertions(+), 8 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 054dbe15d6..db0ad5a80f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -113,6 +113,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
BrokerMetrics brokerMetrics)
throws TimeoutException {
+ int numRecords;
Iterator<Record> sortedIterator;
if (!dataTables.isEmpty()) {
IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
@@ -120,20 +121,32 @@ public class GroupByDataTableReducer implements DataTableReducer {
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
}
+ numRecords = indexedTable.size();
sortedIterator = indexedTable.iterator();
} else {
+ numRecords = 0;
sortedIterator = Collections.emptyIterator();
}
DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
- ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
- int numColumns = columnDataTypes.length;
- int limit = _queryContext.getLimit();
- List<Object[]> rows = new ArrayList<>(limit);
PostAggregationHandler postAggregationHandler =
new PostAggregationHandler(_queryContext, prePostAggregationDataSchema);
+ DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+
+ // Directly return when there is no record returned, or limit is 0
+ int limit = _queryContext.getLimit();
+ if (numRecords == 0 || limit == 0) {
+ brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
+ return;
+ }
+
+ // Calculate rows before post-aggregation
+ List<Object[]> rows;
+ ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
+ int numColumns = columnDataTypes.length;
FilterContext havingFilter = _queryContext.getHavingFilter();
if (havingFilter != null) {
+ rows = new ArrayList<>();
HavingFilterHandler havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler);
while (rows.size() < limit && sortedIterator.hasNext()) {
Object[] row = sortedIterator.next().getValues();
@@ -146,7 +159,9 @@ public class GroupByDataTableReducer implements DataTableReducer {
}
}
} else {
- for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+ int numRows = Math.min(numRecords, limit);
+ rows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; i++) {
Object[] row = sortedIterator.next().getValues();
extractFinalAggregationResults(row);
for (int j = 0; j < numColumns; j++) {
@@ -155,11 +170,11 @@ public class GroupByDataTableReducer implements DataTableReducer {
rows.add(row);
}
}
- DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+
+ // Calculate final result rows after post aggregation
+ List<Object[]> resultRows = new ArrayList<>(rows.size());
ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
int numResultColumns = resultColumnDataTypes.length;
- int numResultRows = rows.size();
- List<Object[]> resultRows = new ArrayList<>(numResultRows);
for (Object[] row : rows) {
Object[] resultRow = postAggregationHandler.getResult(row);
for (int i = 0; i < numResultColumns; i++) {
@@ -167,6 +182,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
}
resultRows.add(resultRow);
}
+
brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org