You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/09 00:56:19 UTC
(pinot) branch master updated: Make groupBy trim size configurable at Broker (#11958)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 330938fe4c Make groupBy trim size configurable at Broker (#11958)
330938fe4c is described below
commit 330938fe4c5c331f98dc62658ca5f467a5621ca7
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Wed Nov 8 16:56:14 2023 -0800
Make groupBy trim size configurable at Broker (#11958)
---
.../org/apache/pinot/core/query/reduce/BaseReduceService.java | 3 +++
.../org/apache/pinot/core/query/reduce/BrokerReduceService.java | 2 +-
.../apache/pinot/core/query/reduce/DataTableReducerContext.java | 8 +++++++-
.../apache/pinot/core/query/reduce/GroupByDataTableReducer.java | 3 +--
.../apache/pinot/core/query/reduce/StreamingReduceService.java | 2 +-
.../pinot/integration/tests/OfflineGRPCServerIntegrationTest.java | 2 +-
.../src/main/java/org/apache/pinot/spi/utils/CommonConstants.java | 2 ++
7 files changed, 16 insertions(+), 6 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 6de874dbda..9b44e0c405 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -51,12 +51,15 @@ public abstract class BaseReduceService {
protected final ExecutorService _reduceExecutorService;
protected final int _maxReduceThreadsPerQuery;
protected final int _groupByTrimThreshold;
+ protected final int _minGroupTrimSize;
public BaseReduceService(PinotConfiguration config) {
_maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
_groupByTrimThreshold = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
+ _minGroupTrimSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE,
+ CommonConstants.Broker.DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE);
int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 93cbecd5bb..8eee542557 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -145,7 +145,7 @@ public class BrokerReduceService extends BaseReduceService {
try {
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
- _groupByTrimThreshold), brokerMetrics);
+ _groupByTrimThreshold, _minGroupTrimSize), brokerMetrics);
} catch (EarlyTerminationException e) {
brokerResponseNative.addToExceptions(
new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
index d4946df81a..d4b69e6c21 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
@@ -31,6 +31,7 @@ public class DataTableReducerContext {
private final long _reduceTimeOutMs;
// used for SQL GROUP BY
private final int _groupByTrimThreshold;
+ private final int _minGroupTrimSize;
/**
* Constructor for the class.
@@ -41,11 +42,12 @@ public class DataTableReducerContext {
* @param groupByTrimThreshold trim threshold for SQL group by
*/
public DataTableReducerContext(ExecutorService executorService, int maxReduceThreadsPerQuery, long reduceTimeOutMs,
- int groupByTrimThreshold) {
+ int groupByTrimThreshold, int minGroupTrimSize) {
_executorService = executorService;
_maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
_reduceTimeOutMs = reduceTimeOutMs;
_groupByTrimThreshold = groupByTrimThreshold;
+ _minGroupTrimSize = minGroupTrimSize;
}
public ExecutorService getExecutorService() {
@@ -63,4 +65,8 @@ public class DataTableReducerContext {
public int getGroupByTrimThreshold() {
return _groupByTrimThreshold;
}
+
+ public int getMinGroupTrimSize() {
+ return _minGroupTrimSize;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 844d295892..46cd73e947 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -249,8 +249,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
// In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls.
int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());
int limit = _queryContext.getLimit();
- // TODO: Make minTrimSize configurable
- int trimSize = GroupByUtils.getTableCapacity(limit);
+ int trimSize = GroupByUtils.getTableCapacity(limit, reducerContext.getMinGroupTrimSize());
// NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy.
// TODO: Resolve the HAVING clause within the IndexedTable before returning the result
int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index 3a01dd05d3..f84c7bcefe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -81,7 +81,7 @@ public class StreamingReduceService extends BaseReduceService {
// Process server response.
DataTableReducerContext dataTableReducerContext =
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
- _groupByTrimThreshold);
+ _groupByTrimThreshold, _minGroupTrimSize);
StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext);
streamingReducer.init(dataTableReducerContext);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 3bda4b17f2..5ea826116b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -60,7 +60,7 @@ import static org.testng.Assert.*;
public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
private static final DataTableReducerContext DATATABLE_REDUCER_CONTEXT = new DataTableReducerContext(
- EXECUTOR_SERVICE, 2, 10000, 10000);
+ EXECUTOR_SERVICE, 2, 10000, 10000, 5000);
@BeforeClass
public void setUp()
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 568b1e1505..ec8c2f7724 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -276,6 +276,8 @@ public class CommonConstants {
// used for SQL GROUP BY during broker reduce
public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold";
public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+ public static final String CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE = "pinot.broker.min.group.trim.size";
+ public static final int DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE = 5000;
// Configure the request handler type used by broker to handler inbound query request.
// NOTE: the request handler type refers to the communication between Broker and Server.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org