You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/03/12 03:27:43 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

mcvsubbu commented on a change in pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#discussion_r592884346



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,100 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];

Review comment:
       In the current implementation of GroupByOrderByCombineOperator and GroupByCombineOperator, the number of threads spawned is equal to numOperators as opposed to the return value from getNumThreadsForQuery

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,100 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
+    for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
+      _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
+          processSegments(threadIndex);
+        }
+      });
+    }
 
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              try {
-                IntermediateResultsBlock resultsBlock =
-                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-                if (isQuerySatisfied(resultsBlock)) {
-                  // Query is satisfied, skip processing the remaining segments
-                  blockingQueue.offer(resultsBlock);
-                  return;
-                } else {
-                  blockingQueue.offer(resultsBlock);
-                }
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+    IntermediateResultsBlock mergedBlock = mergeResultsFromSegments();
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
+
+  /**
+   * processSegments will execute query on one or more segments in a single thread.
+   */
+  protected void processSegments(int threadIndex) {
+    try {
+      // Register the thread to the phaser
+      // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+      //       means the query execution has finished, and the main thread has deregistered itself and returned
+      //       the result. Directly return as no execution result will be taken.
+      if (_phaser.register() < 0) {
+        return;
+      }
+
+      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+        try {
+          IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+          if (isQuerySatisfied(resultsBlock)) {
+            // Query is satisfied, skip processing the remaining segments
+            _blockingQueue.offer(resultsBlock);
+            return;
+          } else {
+            _blockingQueue.offer(resultsBlock);
           }
+        } catch (EarlyTerminationException e) {
+          // Early-terminated by interruption (canceled by the main thread)
+          return;
+        } catch (Exception e) {
+          // Caught exception, skip processing the remaining operators
+          LOGGER
+              .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
+                  e);
+          _blockingQueue.offer(new IntermediateResultsBlock(e));
+          return;
         }
-      });
+      }
+    } finally {
+      _phaser.arriveAndDeregister();
     }
+  }
 
+  /**
+<<<<<<< HEAD

Review comment:
       ??




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org