You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/14 21:07:02 UTC

[incubator-pinot] 01/01: Do not limit number of groups in combine operator (inter-segments)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch num-groups-limit
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d187704de14c40fa4f35a2ace2ace46d75488db7
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue May 14 13:59:54 2019 -0700

    Do not limit number of groups in combine operator (inter-segments)
    
    Right now we limit number of groups stored both inner-segment and
    inter-segments with the same limit. Once the segment results reach
    the inner-segment limit, it will also reach the inter-segments
    limit after combining the results in the combine operator. This
    will cause the new groups from the other segment being dropped.
    This might cause inconsistent result depending on the order of
    segment execution.
    The solution would be, only limit number of groups inside segment
    but not inter-segments to get consistent result regardless of the
    order of execution. This won't increase the memory cost
    significantly because most of the groups stored inner-segment
    should be the same.
---
 .../apache/pinot/core/operator/CombineGroupByOperator.java | 14 ++++----------
 .../java/org/apache/pinot/core/plan/CombinePlanNode.java   |  1 +
 .../pinot/core/plan/maker/InstancePlanMakerImplV2.java     |  2 +-
 3 files changed, 6 insertions(+), 11 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
index dee7fd7..ad4dcc7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.ProcessingException;
@@ -95,7 +94,6 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
     int numOperators = _operators.size();
     CountDownLatch operatorLatch = new CountDownLatch(numOperators);
     ConcurrentHashMap<String, Object[]> resultsMap = new ConcurrentHashMap<>();
-    AtomicInteger numGroups = new AtomicInteger();
     ConcurrentLinkedQueue<ProcessingException> mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
 
     AggregationFunctionContext[] aggregationFunctionContexts =
@@ -134,12 +132,9 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
                 GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
                 resultsMap.compute(groupKey._stringKey, (key, value) -> {
                   if (value == null) {
-                    if (numGroups.get() < _numGroupsLimit) {
-                      numGroups.getAndIncrement();
-                      value = new Object[numAggregationFunctions];
-                      for (int i = 0; i < numAggregationFunctions; i++) {
-                        value[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
-                      }
+                    value = new Object[numAggregationFunctions];
+                    for (int i = 0; i < numAggregationFunctions; i++) {
+                      value[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
                     }
                   } else {
                     for (int i = 0; i < numAggregationFunctions; i++) {
@@ -198,8 +193,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
       mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
       mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
       mergedBlock.setNumTotalRawDocs(executionStatistics.getNumTotalRawDocs());
-      // NOTE: numGroups might go slightly over numGroupsLimit because the comparison is not atomic
-      if (numGroups.get() >= _numGroupsLimit) {
+      if (resultsMap.size() >= _numGroupsLimit) {
         mergedBlock.setNumGroupsLimitReached(true);
       }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index ee3c888..c0cce5a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -56,6 +56,7 @@ public class CombinePlanNode implements PlanNode {
    * @param brokerRequest Broker request
    * @param executorService Executor service
    * @param timeOutMs Time out in milliseconds for query execution (not for planning phase)
+   * @param numGroupsLimit Limit of number of groups stored in each segment
    */
   public CombinePlanNode(List<PlanNode> planNodes, BrokerRequest brokerRequest, ExecutorService executorService,
       long timeOutMs, int numGroupsLimit) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 3807b16..bc62a20 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -58,7 +58,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
 
   private final int _maxInitialResultHolderCapacity;
-  // Limit on number of groups, beyond which no new group will be created
+  // Limit on number of groups stored for each segment, beyond which no new group will be created
   private final int _numGroupsLimit;
 
   @VisibleForTesting


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org