You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/13 01:43:35 UTC
[pinot] branch master updated: Optimize combine operator to fully utilize threads (#9387)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5506871181 Optimize combine operator to fully utilize threads (#9387)
5506871181 is described below
commit 5506871181f302c0786f84a6c6877eccdf646086
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Sep 12 18:43:29 2022 -0700
Optimize combine operator to fully utilize threads (#9387)
---
.../core/operator/combine/BaseCombineOperator.java | 19 +++++++++++--------
.../combine/GroupByOrderByCombineOperator.java | 7 ++++---
...nMaxValueBasedSelectionOrderByCombineOperator.java | 11 ++++++-----
.../StreamingSelectionOnlyCombineOperator.java | 7 ++++---
4 files changed, 25 insertions(+), 19 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index cea37cefdf..2031cd030e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
@@ -60,7 +61,9 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
protected final ExecutorService _executorService;
protected final int _numTasks;
protected final Future[] _futures;
- // Use a _blockingQueue to store the intermediate results blocks
+ // Use an AtomicInteger to track the next operator to execute
+ protected final AtomicInteger _nextOperatorId = new AtomicInteger();
+ // Use a BlockingQueue to store the intermediate results blocks
protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0);
@@ -86,7 +89,6 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
Phaser phaser = new Phaser(1);
Tracing.activeRecording().setNumTasks(_numTasks);
for (int i = 0; i < _numTasks; i++) {
- int taskIndex = i;
_futures[i] = _executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
@@ -100,7 +102,7 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
return;
}
try {
- processSegments(taskIndex);
+ processSegments();
} catch (EarlyTerminationException e) {
// Early-terminated by interruption (canceled by the main thread)
} catch (Exception e) {
@@ -151,9 +153,10 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
/**
* Executes query on one or more segments in a worker thread.
*/
- protected void processSegments(int taskIndex) {
- for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
- Operator operator = _operators.get(operatorIndex);
+ protected void processSegments() {
+ int operatorId;
+ while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+ Operator operator = _operators.get(operatorId);
T resultsBlock;
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
@@ -176,14 +179,14 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
}
/**
- * Invoked when {@link #processSegments(int)} throws exception.
+ * Invoked when {@link #processSegments()} throws exception.
*/
protected void onException(Exception e) {
_blockingQueue.offer(new ExceptionResultsBlock(e));
}
/**
- * Invoked when {@link #processSegments(int)} is finished (called in the finally block).
+ * Invoked when {@link #processSegments()} is finished (called in the finally block).
*/
protected void onFinish() {
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index bb49dbfce2..19c7537a8d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -129,9 +129,10 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator<GroupByRe
* Executes query on one segment in a worker thread and merges the results into the indexed table.
*/
@Override
- protected void processSegments(int taskIndex) {
- for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
- Operator operator = _operators.get(operatorIndex);
+ protected void processSegments() {
+ int operatorId;
+ while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+ Operator operator = _operators.get(operatorId);
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
((AcquireReleaseColumnsSegmentOperator) operator).acquire();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 72b706c2dc..f157ce6158 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -129,7 +129,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
* documents to fulfill the LIMIT and OFFSET requirement.
*/
@Override
- protected void processSegments(int taskIndex) {
+ protected void processSegments() {
List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
assert orderByExpressions != null;
int numOrderByExpressions = orderByExpressions.size();
@@ -144,7 +144,8 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
// segment result is merged.
Comparable threadBoundaryValue = null;
- for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+ int operatorId;
+ while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
// Calculate the boundary value from global boundary and thread boundary
Comparable boundaryValue = _globalBoundaryValue.get();
if (boundaryValue == null) {
@@ -164,7 +165,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
}
// Check if the segment can be skipped
- MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorIndex);
+ MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorId);
if (boundaryValue != null) {
if (asc) {
// For ascending order, no need to process more segments if the column min value is larger than the
@@ -172,7 +173,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
if (minMaxValueContext._minValue != null) {
int result = minMaxValueContext._minValue.compareTo(boundaryValue);
if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
@@ -183,7 +184,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
if (minMaxValueContext._maxValue != null) {
int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
index 27e8a2f46b..e19a45d1fe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -71,9 +71,10 @@ public class StreamingSelectionOnlyCombineOperator extends BaseCombineOperator<S
}
@Override
- protected void processSegments(int threadIndex) {
- for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
- Operator operator = _operators.get(operatorIndex);
+ protected void processSegments() {
+ int operatorId;
+ while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+ Operator operator = _operators.get(operatorId);
SelectionResultsBlock resultsBlock;
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org