You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/16 02:18:22 UTC
(pinot) branch master updated: Add query option override for Broker MinGroupTrimSize (#11984)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1ab9e62b6d Add query option override for Broker MinGroupTrimSize (#11984)
1ab9e62b6d is described below
commit 1ab9e62b6d1908dda598eaaff9f6a2d8cc5a7b65
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Wed Nov 15 18:18:16 2023 -0800
Add query option override for Broker MinGroupTrimSize (#11984)
---
.../pinot/common/utils/config/QueryOptionsUtils.java | 6 ++++++
.../pinot/core/query/reduce/BrokerReduceService.java | 14 +++++++++++++-
.../pinot/core/query/reduce/StreamingReduceService.java | 13 ++++++++++++-
.../java/org/apache/pinot/spi/utils/CommonConstants.java | 1 +
4 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index f2589d7f37..8d93184c35 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -173,6 +173,12 @@ public class QueryOptionsUtils {
return minServerGroupTrimSizeString != null ? Integer.parseInt(minServerGroupTrimSizeString) : null;
}
+ @Nullable
+ public static Integer getMinBrokerGroupTrimSize(Map<String, String> queryOptions) {
+ String minBrokerGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE);
+ return minBrokerGroupTrimSizeString != null ? Integer.parseInt(minBrokerGroupTrimSizeString) : null;
+ }
+
public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 8eee542557..c664a78e67 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -142,10 +143,21 @@ public class BrokerReduceService extends BaseReduceService {
QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext);
+
+ Integer minGroupTrimSizeQueryOption = null;
+ Integer groupTrimThresholdQueryOption = null;
+ if (queryOptions != null) {
+ minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
+ groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+ }
+ int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
+ int groupTrimThreshold =
+ groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;
+
try {
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
- _groupByTrimThreshold, _minGroupTrimSize), brokerMetrics);
+ groupTrimThreshold, minGroupTrimSize), brokerMetrics);
} catch (EarlyTerminationException e) {
brokerResponseNative.addToExceptions(
new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index f84c7bcefe..f3a04f1857 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -78,10 +79,20 @@ public class StreamingReduceService extends BaseReduceService {
// initialize empty response.
ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
+ Integer minGroupTrimSizeQueryOption = null;
+ Integer groupTrimThresholdQueryOption = null;
+ if (queryOptions != null) {
+ minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
+ groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+ }
+ int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
+ int groupTrimThreshold =
+ groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;
+
// Process server response.
DataTableReducerContext dataTableReducerContext =
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
- _groupByTrimThreshold, _minGroupTrimSize);
+ groupTrimThreshold, minGroupTrimSize);
StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext);
streamingReducer.init(dataTableReducerContext);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ec8c2f7724..09fe5129e0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -347,6 +347,7 @@ public class CommonConstants {
public static final String MAX_EXECUTION_THREADS = "maxExecutionThreads";
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "minSegmentGroupTrimSize";
public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize";
+ public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize";
public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery";
public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org