You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/16 02:18:22 UTC

(pinot) branch master updated: Add query option override for Broker MinGroupTrimSize (#11984)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ab9e62b6d Add query option override for Broker MinGroupTrimSize (#11984)
1ab9e62b6d is described below

commit 1ab9e62b6d1908dda598eaaff9f6a2d8cc5a7b65
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Wed Nov 15 18:18:16 2023 -0800

    Add query option override for Broker MinGroupTrimSize (#11984)
---
 .../pinot/common/utils/config/QueryOptionsUtils.java       |  6 ++++++
 .../pinot/core/query/reduce/BrokerReduceService.java       | 14 +++++++++++++-
 .../pinot/core/query/reduce/StreamingReduceService.java    | 13 ++++++++++++-
 .../java/org/apache/pinot/spi/utils/CommonConstants.java   |  1 +
 4 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index f2589d7f37..8d93184c35 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -173,6 +173,12 @@ public class QueryOptionsUtils {
     return minServerGroupTrimSizeString != null ? Integer.parseInt(minServerGroupTrimSizeString) : null;
   }
 
+  @Nullable
+  public static Integer getMinBrokerGroupTrimSize(Map<String, String> queryOptions) {
+    String minBrokerGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE);
+    return minBrokerGroupTrimSizeString != null ? Integer.parseInt(minBrokerGroupTrimSizeString) : null;
+  }
+
   public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
     return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 8eee542557..c664a78e67 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -142,10 +143,21 @@ public class BrokerReduceService extends BaseReduceService {
 
     QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
     DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext);
+
+    Integer minGroupTrimSizeQueryOption = null;
+    Integer groupTrimThresholdQueryOption = null;
+    if (queryOptions != null) {
+      minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
+      groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+    }
+    int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
+    int groupTrimThreshold =
+        groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;
+
     try {
       dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative,
           new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
-              _groupByTrimThreshold, _minGroupTrimSize), brokerMetrics);
+              groupTrimThreshold, minGroupTrimSize), brokerMetrics);
     } catch (EarlyTerminationException e) {
       brokerResponseNative.addToExceptions(
           new QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, e.toString()));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index f84c7bcefe..f3a04f1857 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -78,10 +79,20 @@ public class StreamingReduceService extends BaseReduceService {
     // initialize empty response.
     ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
 
+    Integer minGroupTrimSizeQueryOption = null;
+    Integer groupTrimThresholdQueryOption = null;
+    if (queryOptions != null) {
+      minGroupTrimSizeQueryOption = QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
+      groupTrimThresholdQueryOption = QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+    }
+    int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? minGroupTrimSizeQueryOption : _minGroupTrimSize;
+    int groupTrimThreshold =
+        groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption : _groupByTrimThreshold;
+
     // Process server response.
     DataTableReducerContext dataTableReducerContext =
         new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
-            _groupByTrimThreshold, _minGroupTrimSize);
+            groupTrimThreshold, minGroupTrimSize);
     StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext);
 
     streamingReducer.init(dataTableReducerContext);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ec8c2f7724..09fe5129e0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -347,6 +347,7 @@ public class CommonConstants {
         public static final String MAX_EXECUTION_THREADS = "maxExecutionThreads";
         public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "minSegmentGroupTrimSize";
         public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize";
+        public static final String MIN_BROKER_GROUP_TRIM_SIZE = "minBrokerGroupTrimSize";
         public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery";
         public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
         public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org