You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/13 01:43:35 UTC

[pinot] branch master updated: Optimize combine operator to fully utilize threads (#9387)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5506871181 Optimize combine operator to fully utilize threads (#9387)
5506871181 is described below

commit 5506871181f302c0786f84a6c6877eccdf646086
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Sep 12 18:43:29 2022 -0700

    Optimize combine operator to fully utilize threads (#9387)
---
 .../core/operator/combine/BaseCombineOperator.java    | 19 +++++++++++--------
 .../combine/GroupByOrderByCombineOperator.java        |  7 ++++---
 ...nMaxValueBasedSelectionOrderByCombineOperator.java | 11 ++++++-----
 .../StreamingSelectionOnlyCombineOperator.java        |  7 ++++---
 4 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index cea37cefdf..2031cd030e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
@@ -60,7 +61,9 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
   protected final ExecutorService _executorService;
   protected final int _numTasks;
   protected final Future[] _futures;
-  // Use a _blockingQueue to store the intermediate results blocks
+  // Use an AtomicInteger to track the next operator to execute
+  protected final AtomicInteger _nextOperatorId = new AtomicInteger();
+  // Use a BlockingQueue to store the intermediate results blocks
   protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
   protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0);
 
@@ -86,7 +89,6 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
     Phaser phaser = new Phaser(1);
     Tracing.activeRecording().setNumTasks(_numTasks);
     for (int i = 0; i < _numTasks; i++) {
-      int taskIndex = i;
       _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
@@ -100,7 +102,7 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
             return;
           }
           try {
-            processSegments(taskIndex);
+            processSegments();
           } catch (EarlyTerminationException e) {
             // Early-terminated by interruption (canceled by the main thread)
           } catch (Exception e) {
@@ -151,9 +153,10 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
   /**
    * Executes query on one or more segments in a worker thread.
    */
-  protected void processSegments(int taskIndex) {
-    for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
-      Operator operator = _operators.get(operatorIndex);
+  protected void processSegments() {
+    int operatorId;
+    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
       T resultsBlock;
       try {
         if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
@@ -176,14 +179,14 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
   }
 
   /**
-   * Invoked when {@link #processSegments(int)} throws exception.
+   * Invoked when {@link #processSegments()} throws exception.
    */
   protected void onException(Exception e) {
     _blockingQueue.offer(new ExceptionResultsBlock(e));
   }
 
   /**
-   * Invoked when {@link #processSegments(int)} is finished (called in the finally block).
+   * Invoked when {@link #processSegments()} is finished (called in the finally block).
    */
   protected void onFinish() {
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index bb49dbfce2..19c7537a8d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -129,9 +129,10 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator<GroupByRe
    * Executes query on one segment in a worker thread and merges the results into the indexed table.
    */
   @Override
-  protected void processSegments(int taskIndex) {
-    for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
-      Operator operator = _operators.get(operatorIndex);
+  protected void processSegments() {
+    int operatorId;
+    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
       try {
         if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
           ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 72b706c2dc..f157ce6158 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -129,7 +129,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
    * documents to fulfill the LIMIT and OFFSET requirement.
    */
   @Override
-  protected void processSegments(int taskIndex) {
+  protected void processSegments() {
     List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
     assert orderByExpressions != null;
     int numOrderByExpressions = orderByExpressions.size();
@@ -144,7 +144,8 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
     //       segment result is merged.
     Comparable threadBoundaryValue = null;
 
-    for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+    int operatorId;
+    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
       // Calculate the boundary value from global boundary and thread boundary
       Comparable boundaryValue = _globalBoundaryValue.get();
       if (boundaryValue == null) {
@@ -164,7 +165,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
       }
 
       // Check if the segment can be skipped
-      MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorIndex);
+      MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorId);
       if (boundaryValue != null) {
         if (asc) {
           // For ascending order, no need to process more segments if the column min value is larger than the
@@ -172,7 +173,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
           if (minMaxValueContext._minValue != null) {
             int result = minMaxValueContext._minValue.compareTo(boundaryValue);
             if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
-              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+              _numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
               _blockingQueue.offer(LAST_RESULTS_BLOCK);
               return;
             }
@@ -183,7 +184,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
           if (minMaxValueContext._maxValue != null) {
             int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
             if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
-              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+              _numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
               _blockingQueue.offer(LAST_RESULTS_BLOCK);
               return;
             }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
index 27e8a2f46b..e19a45d1fe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -71,9 +71,10 @@ public class StreamingSelectionOnlyCombineOperator extends BaseCombineOperator<S
   }
 
   @Override
-  protected void processSegments(int threadIndex) {
-    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
-      Operator operator = _operators.get(operatorIndex);
+  protected void processSegments() {
+    int operatorId;
+    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
       SelectionResultsBlock resultsBlock;
       try {
         if (operator instanceof AcquireReleaseColumnsSegmentOperator) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org