You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/06/06 19:05:51 UTC

[incubator-pinot] branch fix_group_limit created (now 1108398)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch fix_group_limit
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 1108398  Fix the potential overflow for numGroupsLimit in CombineGroupByOperator

This branch includes the following new commits:

     new 1108398  Fix the potential overflow for numGroupsLimit in CombineGroupByOperator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Fix the potential overflow for numGroupsLimit in CombineGroupByOperator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch fix_group_limit
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 11083988d3d31416d83c3fbee26beed3f16753aa
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Jun 6 12:05:13 2019 -0700

    Fix the potential overflow for numGroupsLimit in CombineGroupByOperator
---
 .../pinot/core/operator/CombineGroupByOperator.java      | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
index 99cf0c3..64ddc4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
@@ -64,18 +64,21 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
   private final BrokerRequest _brokerRequest;
   private final ExecutorService _executorService;
   private final long _timeOutMs;
-  // Limit on number of groups stored for each segment, beyond which no new group will be created
-  private final int _numGroupsLimit;
+  // Limit on number of groups stored, beyond which no new group will be created
+  private final int _innerSegmentNumGroupsLimit;
+  private final int _interSegmentNumGroupsLimit;
 
   public CombineGroupByOperator(List<Operator> operators, BrokerRequest brokerRequest, ExecutorService executorService,
-      long timeOutMs, int numGroupsLimit) {
+      long timeOutMs, int innerSegmentNumGroupsLimit) {
     Preconditions.checkArgument(brokerRequest.isSetAggregationsInfo() && brokerRequest.isSetGroupBy());
 
     _operators = operators;
     _brokerRequest = brokerRequest;
     _executorService = executorService;
     _timeOutMs = timeOutMs;
-    _numGroupsLimit = numGroupsLimit;
+    _innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
+    _interSegmentNumGroupsLimit =
+        (int) Math.min((long) innerSegmentNumGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR, Integer.MAX_VALUE);
   }
 
   /**
@@ -103,7 +106,6 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
     CountDownLatch operatorLatch = new CountDownLatch(numOperators);
     ConcurrentHashMap<String, Object[]> resultsMap = new ConcurrentHashMap<>();
     AtomicInteger numGroups = new AtomicInteger();
-    int interSegmentNumGroupsLimit = _numGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR;
     ConcurrentLinkedQueue<ProcessingException> mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
 
     AggregationFunctionContext[] aggregationFunctionContexts =
@@ -142,7 +144,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
                 GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
                 resultsMap.compute(groupKey._stringKey, (key, value) -> {
                   if (value == null) {
-                    if (numGroups.getAndIncrement() < interSegmentNumGroupsLimit) {
+                    if (numGroups.getAndIncrement() < _interSegmentNumGroupsLimit) {
                       value = new Object[numAggregationFunctions];
                       for (int i = 0; i < numAggregationFunctions; i++) {
                         value[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
@@ -208,7 +210,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
 
       // TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we
       //       are comparing number of groups across segments with the groups limit for each segment.
-      if (resultsMap.size() >= _numGroupsLimit) {
+      if (resultsMap.size() >= _innerSegmentNumGroupsLimit) {
         mergedBlock.setNumGroupsLimitReached(true);
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org