You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/09 00:56:19 UTC

(pinot) branch master updated: Make groupBy trim size configurable at Broker (#11958)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 330938fe4c Make groupBy trim size configurable at Broker (#11958)
330938fe4c is described below

commit 330938fe4c5c331f98dc62658ca5f467a5621ca7
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Wed Nov 8 16:56:14 2023 -0800

    Make groupBy trim size configurable at Broker (#11958)
---
 .../org/apache/pinot/core/query/reduce/BaseReduceService.java     | 3 +++
 .../org/apache/pinot/core/query/reduce/BrokerReduceService.java   | 2 +-
 .../apache/pinot/core/query/reduce/DataTableReducerContext.java   | 8 +++++++-
 .../apache/pinot/core/query/reduce/GroupByDataTableReducer.java   | 3 +--
 .../apache/pinot/core/query/reduce/StreamingReduceService.java    | 2 +-
 .../pinot/integration/tests/OfflineGRPCServerIntegrationTest.java | 2 +-
 .../src/main/java/org/apache/pinot/spi/utils/CommonConstants.java | 2 ++
 7 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 6de874dbda..9b44e0c405 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -51,12 +51,15 @@ public abstract class BaseReduceService {
   protected final ExecutorService _reduceExecutorService;
   protected final int _maxReduceThreadsPerQuery;
   protected final int _groupByTrimThreshold;
+  protected final int _minGroupTrimSize;
 
   public BaseReduceService(PinotConfiguration config) {
     _maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
         CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
     _groupByTrimThreshold = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
         CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
+    _minGroupTrimSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE,
+        CommonConstants.Broker.DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE);
 
     int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
     LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 93cbecd5bb..8eee542557 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -145,7 +145,7 @@ public class BrokerReduceService extends BaseReduceService {
     try {
       dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
           new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
-              _groupByTrimThreshold), brokerMetrics);
+              _groupByTrimThreshold, _minGroupTrimSize), brokerMetrics);
     } catch (EarlyTerminationException e) {
       brokerResponseNative.addToExceptions(
           new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
index d4946df81a..d4b69e6c21 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
@@ -31,6 +31,7 @@ public class DataTableReducerContext {
   private final long _reduceTimeOutMs;
   // used for SQL GROUP BY
   private final int _groupByTrimThreshold;
+  private final int _minGroupTrimSize;
 
   /**
    * Constructor for the class.
@@ -41,11 +42,12 @@ public class DataTableReducerContext {
    * @param groupByTrimThreshold trim threshold for SQL group by
    */
   public DataTableReducerContext(ExecutorService executorService, int maxReduceThreadsPerQuery, long reduceTimeOutMs,
-      int groupByTrimThreshold) {
+      int groupByTrimThreshold, int minGroupTrimSize) {
     _executorService = executorService;
     _maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
     _reduceTimeOutMs = reduceTimeOutMs;
     _groupByTrimThreshold = groupByTrimThreshold;
+    _minGroupTrimSize = minGroupTrimSize;
   }
 
   public ExecutorService getExecutorService() {
@@ -63,4 +65,8 @@ public class DataTableReducerContext {
   public int getGroupByTrimThreshold() {
     return _groupByTrimThreshold;
   }
+
+  public int getMinGroupTrimSize() {
+    return _minGroupTrimSize;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 844d295892..46cd73e947 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -249,8 +249,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
     // In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls.
     int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());
     int limit = _queryContext.getLimit();
-    // TODO: Make minTrimSize configurable
-    int trimSize = GroupByUtils.getTableCapacity(limit);
+    int trimSize = GroupByUtils.getTableCapacity(limit, reducerContext.getMinGroupTrimSize());
     // NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy.
     // TODO: Resolve the HAVING clause within the IndexedTable before returning the result
     int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index 3a01dd05d3..f84c7bcefe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -81,7 +81,7 @@ public class StreamingReduceService extends BaseReduceService {
     // Process server response.
     DataTableReducerContext dataTableReducerContext =
         new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
-            _groupByTrimThreshold);
+            _groupByTrimThreshold, _minGroupTrimSize);
     StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext);
 
     streamingReducer.init(dataTableReducerContext);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 3bda4b17f2..5ea826116b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -60,7 +60,7 @@ import static org.testng.Assert.*;
 public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest {
   private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
   private static final DataTableReducerContext DATATABLE_REDUCER_CONTEXT = new DataTableReducerContext(
-      EXECUTOR_SERVICE, 2, 10000, 10000);
+      EXECUTOR_SERVICE, 2, 10000, 10000, 5000);
 
   @BeforeClass
   public void setUp()
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 568b1e1505..ec8c2f7724 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -276,6 +276,8 @@ public class CommonConstants {
     // used for SQL GROUP BY during broker reduce
     public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold";
     public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+    public static final String CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE = "pinot.broker.min.group.trim.size";
+    public static final int DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE = 5000;
 
     // Configure the request handler type used by broker to handler inbound query request.
     // NOTE: the request handler type refers to the communication between Broker and Server.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org