You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/06/06 21:57:47 UTC

[incubator-pinot] branch master updated: Fix the potential overflow for numGroupsLimit in CombineGroupByOperator (#4285)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fb08648  Fix the potential overflow for numGroupsLimit in CombineGroupByOperator (#4285)
fb08648 is described below

commit fb086485d3a092914942c579b630a46ca2f22ac5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jun 6 14:57:42 2019 -0700

    Fix the potential overflow for numGroupsLimit in CombineGroupByOperator (#4285)
---
 .../pinot/core/operator/CombineGroupByOperator.java      | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
index 99cf0c3..64ddc4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
@@ -64,18 +64,21 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
   private final BrokerRequest _brokerRequest;
   private final ExecutorService _executorService;
   private final long _timeOutMs;
-  // Limit on number of groups stored for each segment, beyond which no new group will be created
-  private final int _numGroupsLimit;
+  // Limit on number of groups stored, beyond which no new group will be created
+  private final int _innerSegmentNumGroupsLimit;
+  private final int _interSegmentNumGroupsLimit;
 
   public CombineGroupByOperator(List<Operator> operators, BrokerRequest brokerRequest, ExecutorService executorService,
-      long timeOutMs, int numGroupsLimit) {
+      long timeOutMs, int innerSegmentNumGroupsLimit) {
     Preconditions.checkArgument(brokerRequest.isSetAggregationsInfo() && brokerRequest.isSetGroupBy());
 
     _operators = operators;
     _brokerRequest = brokerRequest;
     _executorService = executorService;
     _timeOutMs = timeOutMs;
-    _numGroupsLimit = numGroupsLimit;
+    _innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
+    _interSegmentNumGroupsLimit =
+        (int) Math.min((long) innerSegmentNumGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR, Integer.MAX_VALUE);
   }
 
   /**
@@ -103,7 +106,6 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
     CountDownLatch operatorLatch = new CountDownLatch(numOperators);
     ConcurrentHashMap<String, Object[]> resultsMap = new ConcurrentHashMap<>();
     AtomicInteger numGroups = new AtomicInteger();
-    int interSegmentNumGroupsLimit = _numGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR;
     ConcurrentLinkedQueue<ProcessingException> mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
 
     AggregationFunctionContext[] aggregationFunctionContexts =
@@ -142,7 +144,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
                 GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
                 resultsMap.compute(groupKey._stringKey, (key, value) -> {
                   if (value == null) {
-                    if (numGroups.getAndIncrement() < interSegmentNumGroupsLimit) {
+                    if (numGroups.getAndIncrement() < _interSegmentNumGroupsLimit) {
                       value = new Object[numAggregationFunctions];
                       for (int i = 0; i < numAggregationFunctions; i++) {
                         value[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
@@ -208,7 +210,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
 
       // TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we
       //       are comparing number of groups across segments with the groups limit for each segment.
-      if (resultsMap.size() >= _numGroupsLimit) {
+      if (resultsMap.size() >= _innerSegmentNumGroupsLimit) {
         mergedBlock.setNumGroupsLimitReached(true);
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org