You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/21 01:12:53 UTC

[incubator-pinot] branch master updated: Increase the limit of groups in combine operator (inter-segments) (#4208)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fc51fdc  Increase the limit of groups in combine operator (inter-segments) (#4208)
fc51fdc is described below

commit fc51fdc01d2b93836346c723107ccbcd5dbc6cdc
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon May 20 18:12:47 2019 -0700

    Increase the limit of groups in combine operator (inter-segments) (#4208)
    
    Right now we limit number of groups stored both inner-segment and
    inter-segments with the same limit. Once the segment results reach
    the inner-segment limit, it will also reach the inter-segments
    limit after combining the results in the combine operator. This
    will cause the new groups from the other segment being dropped.
    This might cause inconsistent result depending on the order of
    segment execution.
    The solution would be, increase the limit (multiply by factor 2) of
    groups across segments to get consistent result regardless of the
    order of execution. For most cases, most groups from each segment
    should be the same, thus the total number of groups across segments
    should be equal or slightly higher than the number of groups in
    each segment. We still put a limit across segments to protect cases
    where data is very skewed across different segments.
---
 .../pinot/core/operator/CombineGroupByOperator.java     | 17 +++++++++++++----
 .../org/apache/pinot/core/plan/CombinePlanNode.java     |  1 +
 .../pinot/core/plan/maker/InstancePlanMakerImplV2.java  |  2 +-
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
index dee7fd7..99cf0c3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
@@ -54,10 +54,17 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
   private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOperator.class);
   private static final String OPERATOR_NAME = "CombineGroupByOperator";
 
+  // Use a higher limit for groups stored across segments. For most cases, most groups from each segment should be the
+  // same, thus the total number of groups across segments should be equal or slightly higher than the number of groups
+  // in each segment. We still put a limit across segments to protect cases where data is very skewed across different
+  // segments.
+  private static final int INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR = 2;
+
   private final List<Operator> _operators;
   private final BrokerRequest _brokerRequest;
   private final ExecutorService _executorService;
   private final long _timeOutMs;
+  // Limit on number of groups stored for each segment, beyond which no new group will be created
   private final int _numGroupsLimit;
 
   public CombineGroupByOperator(List<Operator> operators, BrokerRequest brokerRequest, ExecutorService executorService,
@@ -96,6 +103,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
     CountDownLatch operatorLatch = new CountDownLatch(numOperators);
     ConcurrentHashMap<String, Object[]> resultsMap = new ConcurrentHashMap<>();
     AtomicInteger numGroups = new AtomicInteger();
+    int interSegmentNumGroupsLimit = _numGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR;
     ConcurrentLinkedQueue<ProcessingException> mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
 
     AggregationFunctionContext[] aggregationFunctionContexts =
@@ -134,8 +142,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
                 GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
                 resultsMap.compute(groupKey._stringKey, (key, value) -> {
                   if (value == null) {
-                    if (numGroups.get() < _numGroupsLimit) {
-                      numGroups.getAndIncrement();
+                    if (numGroups.getAndIncrement() < interSegmentNumGroupsLimit) {
                       value = new Object[numAggregationFunctions];
                       for (int i = 0; i < numAggregationFunctions; i++) {
                         value[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
@@ -198,8 +205,10 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
       mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
       mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
       mergedBlock.setNumTotalRawDocs(executionStatistics.getNumTotalRawDocs());
-      // NOTE: numGroups might go slightly over numGroupsLimit because the comparison is not atomic
-      if (numGroups.get() >= _numGroupsLimit) {
+
+      // TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we
+      //       are comparing number of groups across segments with the groups limit for each segment.
+      if (resultsMap.size() >= _numGroupsLimit) {
         mergedBlock.setNumGroupsLimitReached(true);
       }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index ee3c888..c0cce5a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -56,6 +56,7 @@ public class CombinePlanNode implements PlanNode {
    * @param brokerRequest Broker request
    * @param executorService Executor service
    * @param timeOutMs Time out in milliseconds for query execution (not for planning phase)
+   * @param numGroupsLimit Limit of number of groups stored in each segment
    */
   public CombinePlanNode(List<PlanNode> planNodes, BrokerRequest brokerRequest, ExecutorService executorService,
       long timeOutMs, int numGroupsLimit) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 3807b16..bc62a20 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -58,7 +58,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
 
   private final int _maxInitialResultHolderCapacity;
-  // Limit on number of groups, beyond which no new group will be created
+  // Limit on number of groups stored for each segment, beyond which no new group will be created
   private final int _numGroupsLimit;
 
   @VisibleForTesting


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org