You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/03/18 01:08:53 UTC
[incubator-pinot] branch master updated: Combine operators: remove
redundant variables and override logger in subclass (#6690)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new acc24a1 Combine operators: remove redundant variables and override logger in subclass (#6690)
acc24a1 is described below
commit acc24a105f3c2d728ed2589e4838108769e82e2c
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Wed Mar 17 18:08:32 2021 -0700
Combine operators: remove redundant variables and override logger in subclass (#6690)
- remove redundant variables in subclass
- override logger in subclass for easy debugging
- remove unnecessary constructor in BaaseCombineOperator since subclass has access to protected variables in supper class.
---
.../core/operator/combine/BaseCombineOperator.java | 9 ++++-----
.../operator/combine/GroupByCombineOperator.java | 1 +
.../combine/GroupByOrderByCombineOperator.java | 1 +
...MaxValueBasedSelectionOrderByCombineOperator.java | 20 ++++++--------------
.../combine/SelectionOnlyCombineOperator.java | 3 +++
.../combine/SelectionOrderByCombineOperator.java | 3 +++
6 files changed, 18 insertions(+), 19 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 1ed5fc1..aef6cd1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -45,23 +45,22 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings("rawtypes")
public abstract class BaseCombineOperator extends BaseOperator<IntermediateResultsBlock> {
- protected static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class);
protected final List<Operator> _operators;
protected final QueryContext _queryContext;
protected final ExecutorService _executorService;
protected final long _endTimeMs;
-
protected final int _numOperators;
- protected int _numThreads;
// Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
// returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
// deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
// behavior (even JVM crash) when processing queries against it.
protected final Phaser _phaser = new Phaser(1);
- protected Future[] _futures;
// Use a _blockingQueue to store the per-segment result
- private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+ protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+ protected int _numThreads;
+ protected Future[] _futures;
public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
long endTimeMs) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index ff40b2e..dadb23a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -79,6 +79,7 @@ public class GroupByCombineOperator extends BaseCombineOperator {
public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
long endTimeMs, int innerSegmentNumGroupsLimit) {
+ // GroupByCombineOperator use numOperators as numThreads
super(operators, queryContext, executorService, endTimeMs, operators.size());
_innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
_interSegmentNumGroupsLimit =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 99ba9df..38f1a54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -79,6 +79,7 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long endTimeMs, int trimThreshold) {
+ // GroupByOrderByCombineOperator use numOperators as numThreads
super(operators, queryContext, executorService, endTimeMs, operators.size());
_initLock = new ReentrantLock();
_trimSize = GroupByUtils.getTableCapacity(_queryContext);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 901d6bc..29359f6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -22,11 +22,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,6 +37,8 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -54,18 +53,14 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombineOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MinMaxValueBasedSelectionOrderByCombineOperator.class);
private static final String OPERATOR_NAME = "MinMaxValueBasedSelectionOrderByCombineOperator";
-
// For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
// special IntermediateResultsBlock into the BlockingQueue to awake the main thread
private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
Collections.emptyList());
- private final int _numOperators;
- private final int numThreads;
- // Use a BlockingQueue to store the per-segment result
- private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
// Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
private final AtomicInteger _numOperatorsSkipped = new AtomicInteger();
private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();
@@ -76,9 +71,6 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
ExecutorService executorService, long endTimeMs, List<MinMaxValueContext> minMaxValueContexts) {
super(operators, queryContext, executorService, endTimeMs);
_minMaxValueContexts = minMaxValueContexts;
- _numOperators = _operators.size();
- numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
- _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
}
@@ -118,7 +110,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
// segment result is merged.
Comparable threadBoundaryValue = null;
- for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += numThreads) {
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
// Calculate the boundary value from global boundary and thread boundary
Comparable boundaryValue = _globalBoundaryValue.get();
if (boundaryValue == null) {
@@ -146,7 +138,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
if (minMaxValueContext._minValue != null) {
int result = minMaxValueContext._minValue.compareTo(boundaryValue);
if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
@@ -157,7 +149,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
if (minMaxValueContext._maxValue != null) {
int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
index 2da4c56..f2d4d06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
@@ -27,6 +27,8 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -37,6 +39,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
*/
@SuppressWarnings("rawtypes")
public class SelectionOnlyCombineOperator extends BaseCombineOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOnlyCombineOperator.class);
private static final String OPERATOR_NAME = "SelectionOnlyCombineOperator";
private final int _numRowsToKeep;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index be899fb..dfd34e9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -32,6 +32,8 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -43,6 +45,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class SelectionOrderByCombineOperator extends BaseCombineOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOrderByCombineOperator.class);
private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator";
private final List<Operator> _operators;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org