You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/06/06 21:57:47 UTC
[incubator-pinot] branch master updated: Fix the potential overflow
for numGroupsLimit in CombineGroupByOperator (#4285)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fb08648 Fix the potential overflow for numGroupsLimit in CombineGroupByOperator (#4285)
fb08648 is described below
commit fb086485d3a092914942c579b630a46ca2f22ac5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jun 6 14:57:42 2019 -0700
Fix the potential overflow for numGroupsLimit in CombineGroupByOperator (#4285)
---
.../pinot/core/operator/CombineGroupByOperator.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
index 99cf0c3..64ddc4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
@@ -64,18 +64,21 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
private final BrokerRequest _brokerRequest;
private final ExecutorService _executorService;
private final long _timeOutMs;
- // Limit on number of groups stored for each segment, beyond which no new group will be created
- private final int _numGroupsLimit;
+ // Limit on number of groups stored, beyond which no new group will be created
+ private final int _innerSegmentNumGroupsLimit;
+ private final int _interSegmentNumGroupsLimit;
public CombineGroupByOperator(List<Operator> operators, BrokerRequest brokerRequest, ExecutorService executorService,
- long timeOutMs, int numGroupsLimit) {
+ long timeOutMs, int innerSegmentNumGroupsLimit) {
Preconditions.checkArgument(brokerRequest.isSetAggregationsInfo() && brokerRequest.isSetGroupBy());
_operators = operators;
_brokerRequest = brokerRequest;
_executorService = executorService;
_timeOutMs = timeOutMs;
- _numGroupsLimit = numGroupsLimit;
+ _innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
+ _interSegmentNumGroupsLimit =
+ (int) Math.min((long) innerSegmentNumGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR, Integer.MAX_VALUE);
}
/**
@@ -103,7 +106,6 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
CountDownLatch operatorLatch = new CountDownLatch(numOperators);
ConcurrentHashMap<String, Object[]> resultsMap = new ConcurrentHashMap<>();
AtomicInteger numGroups = new AtomicInteger();
- int interSegmentNumGroupsLimit = _numGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR;
ConcurrentLinkedQueue<ProcessingException> mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
AggregationFunctionContext[] aggregationFunctionContexts =
@@ -142,7 +144,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
resultsMap.compute(groupKey._stringKey, (key, value) -> {
if (value == null) {
- if (numGroups.getAndIncrement() < interSegmentNumGroupsLimit) {
+ if (numGroups.getAndIncrement() < _interSegmentNumGroupsLimit) {
value = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
value[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
@@ -208,7 +210,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
// TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we
// are comparing number of groups across segments with the groups limit for each segment.
- if (resultsMap.size() >= _numGroupsLimit) {
+ if (resultsMap.size() >= _innerSegmentNumGroupsLimit) {
mergedBlock.setNumGroupsLimitReached(true);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org