You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/18 21:01:36 UTC

[pinot] branch master updated: Prevent over allocating for group-by reduce (#8921)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b791cd10fe Prevent over allocating for group-by reduce (#8921)
b791cd10fe is described below

commit b791cd10febd94b990ade4b21bfe6ffc72350491
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sat Jun 18 14:01:30 2022 -0700

    Prevent over allocating for group-by reduce (#8921)
    
    - Allocate array based on both the LIMIT and number of records returned
    - Short-circuit the calculation when there is no record returned, or LIMIT is 0
---
 .../core/query/reduce/GroupByDataTableReducer.java | 32 ++++++++++++++++------
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 054dbe15d6..db0ad5a80f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -113,6 +113,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
       Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
       BrokerMetrics brokerMetrics)
       throws TimeoutException {
+    int numRecords;
     Iterator<Record> sortedIterator;
     if (!dataTables.isEmpty()) {
       IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
@@ -120,20 +121,32 @@ public class GroupByDataTableReducer implements DataTableReducer {
         brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
         brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
       }
+      numRecords = indexedTable.size();
       sortedIterator = indexedTable.iterator();
     } else {
+      numRecords = 0;
       sortedIterator = Collections.emptyIterator();
     }
 
     DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
-    ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
-    int numColumns = columnDataTypes.length;
-    int limit = _queryContext.getLimit();
-    List<Object[]> rows = new ArrayList<>(limit);
     PostAggregationHandler postAggregationHandler =
         new PostAggregationHandler(_queryContext, prePostAggregationDataSchema);
+    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+
+    // Directly return when there is no record returned, or limit is 0
+    int limit = _queryContext.getLimit();
+    if (numRecords == 0 || limit == 0) {
+      brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
+      return;
+    }
+
+    // Calculate rows before post-aggregation
+    List<Object[]> rows;
+    ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
     FilterContext havingFilter = _queryContext.getHavingFilter();
     if (havingFilter != null) {
+      rows = new ArrayList<>();
       HavingFilterHandler havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler);
       while (rows.size() < limit && sortedIterator.hasNext()) {
         Object[] row = sortedIterator.next().getValues();
@@ -146,7 +159,9 @@ public class GroupByDataTableReducer implements DataTableReducer {
         }
       }
     } else {
-      for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
+      int numRows = Math.min(numRecords, limit);
+      rows = new ArrayList<>(numRows);
+      for (int i = 0; i < numRows; i++) {
         Object[] row = sortedIterator.next().getValues();
         extractFinalAggregationResults(row);
         for (int j = 0; j < numColumns; j++) {
@@ -155,11 +170,11 @@ public class GroupByDataTableReducer implements DataTableReducer {
         rows.add(row);
       }
     }
-    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+
+    // Calculate final result rows after post aggregation
+    List<Object[]> resultRows = new ArrayList<>(rows.size());
     ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
     int numResultColumns = resultColumnDataTypes.length;
-    int numResultRows = rows.size();
-    List<Object[]> resultRows = new ArrayList<>(numResultRows);
     for (Object[] row : rows) {
       Object[] resultRow = postAggregationHandler.getResult(row);
       for (int i = 0; i < numResultColumns; i++) {
@@ -167,6 +182,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
       }
       resultRows.add(resultRow);
     }
+
     brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org