You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/03/11 19:04:25 UTC

[GitHub] [incubator-pinot] mqliang opened a new pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

mqliang opened a new pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670


   Extends GroupByCombineOperator and GroupByOrderByCombineOperator from BaseCombineOperator
   
   ## Description
   
   This PR extends GroupByCombineOperator and GroupByOrderByCombineOperator from BaseCombineOperator, so that all combine operator will share the same getNextBlock() function, so that we can put some metric instrumentation/execution statistics logic in one place.
   
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release.
   
   If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text
   
   ## Documentation
   If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang closed pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
mqliang closed pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang closed pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
mqliang closed pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r592695950



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,90 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  private final Phaser _phaser = new Phaser(1);
+  private final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
+    for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
+      _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
+          processBlock(threadIndex);
+        }
+      });
+    }
+
+    IntermediateResultsBlock mergedBlock = mergeBlock();
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
 
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              try {
-                IntermediateResultsBlock resultsBlock =
-                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-                if (isQuerySatisfied(resultsBlock)) {
-                  // Query is satisfied, skip processing the remaining segments
-                  blockingQueue.offer(resultsBlock);
-                  return;
-                } else {
-                  blockingQueue.offer(resultsBlock);
-                }
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+  protected void processBlock(int threadIndex) {
+    try {
+      // Register the thread to the phaser
+      // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+      //       means the query execution has finished, and the main thread has deregistered itself and returned
+      //       the result. Directly return as no execution result will be taken.
+      if (_phaser.register() < 0) {
+        return;
+      }
+
+      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+        try {
+          IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+          if (isQuerySatisfied(resultsBlock)) {
+            // Query is satisfied, skip processing the remaining segments
+            _blockingQueue.offer(resultsBlock);
+            return;
+          } else {
+            _blockingQueue.offer(resultsBlock);
           }
+        } catch (EarlyTerminationException e) {
+          // Early-terminated by interruption (canceled by the main thread)
+          return;
+        } catch (Exception e) {
+          // Caught exception, skip processing the remaining operators
+          LOGGER
+              .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
+                  e);
+          _blockingQueue.offer(new IntermediateResultsBlock(e));
+          return;
         }
-      });
+      }
+    } finally {
+      _phaser.arriveAndDeregister();
     }
+  }
 
+  protected IntermediateResultsBlock mergeBlock() {

Review comment:
       (nit) consider renaming this to mergeResultsFromSegments()
   Also, please add a small javadoc highlighting that this thread will merge the results across all segments

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,90 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  private final Phaser _phaser = new Phaser(1);
+  private final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
+    for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
+      _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
+          processBlock(threadIndex);
+        }
+      });
+    }
+
+    IntermediateResultsBlock mergedBlock = mergeBlock();
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
 
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              try {
-                IntermediateResultsBlock resultsBlock =
-                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-                if (isQuerySatisfied(resultsBlock)) {
-                  // Query is satisfied, skip processing the remaining segments
-                  blockingQueue.offer(resultsBlock);
-                  return;
-                } else {
-                  blockingQueue.offer(resultsBlock);
-                }
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+  protected void processBlock(int threadIndex) {

Review comment:
       (nit) consider renaming this to processSegments() or executeOnSegments()
   Also, please add a small javadoc highlighting that this will execute query on one or more segments (aka operators in the code) in the thread




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r592696550



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
##########
@@ -59,11 +59,10 @@
  *   - Try to extend BaseCombineOperator to reduce duplicate code

Review comment:
       You can remove this TODO now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r592696228



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
##########
@@ -54,7 +52,7 @@
  *   - Try to extend BaseCombineOperator to reduce duplicate code

Review comment:
       You can remove this TODO now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r592695980



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,90 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  private final Phaser _phaser = new Phaser(1);
+  private final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
+    for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
+      _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
+          processBlock(threadIndex);
+        }
+      });
+    }
+
+    IntermediateResultsBlock mergedBlock = mergeBlock();
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
 
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              try {
-                IntermediateResultsBlock resultsBlock =
-                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-                if (isQuerySatisfied(resultsBlock)) {
-                  // Query is satisfied, skip processing the remaining segments
-                  blockingQueue.offer(resultsBlock);
-                  return;
-                } else {
-                  blockingQueue.offer(resultsBlock);
-                }
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+  protected void processBlock(int threadIndex) {

Review comment:
       (nit) consider renaming this to processSegments() or executeOnSegments()
   Also, please add a small javadoc highlighting that this will execute query on one or more segments (aka operators in the code) in a single thread




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r593043930



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,96 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  protected final int _numOperators;
+  protected final int _numThreads;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);

Review comment:
       We should try to unify this behavior of computing the number of threads (essentially tasks that we submit to the pool) in the outer loop.
   
   In the existing code, BaseCombineOperator and it's subclasses derive numThreads from numOperators using CombineOperatorUtils.getNumThreadsForQuery
   
   However, GroupByCombineOperator and GroupByOrderByCombineOperator don't derive numThreads from numOperators. They simply use numOperators in the outer loop. So, as part of the current refactoring to make all  combine operators extend BaseCombineOperator, we should stick to the existing behavior. I think numThreads has to be passed to super class
   
   Later on, we can see if the numThreads computation can be unified in a separate PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r593043930



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,96 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  protected final int _numOperators;
+  protected final int _numThreads;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);

Review comment:
       We should try to unify this behavior of computing the number of threads (essentially tasks that we submit to the pool) in the outer loop.
   
   In the existing code, BaseCombineOperator and it's subclasses derive numThreads from numOperators using CombineOperatorUtils.getNumThreadsForQuery
   
   However, GroupByCombineOperator and GroupByOrderByCombineOperator don't derive numThreads from numOperators. They simply use numOperators in the outer loop. So, as part of the current refactoring to make all  combine operators extend BaseCombineOperator, we should stick to the existing behavior. Later on, we can see if this can be unified in a different PR. I think numThreads has to be passed to super class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r592696228



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
##########
@@ -54,7 +52,7 @@
  *   - Try to extend BaseCombineOperator to reduce duplicate code

Review comment:
       We can remove this TODO now

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
##########
@@ -59,11 +59,10 @@
  *   - Try to extend BaseCombineOperator to reduce duplicate code

Review comment:
       We can remove this TODO now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
mqliang commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r592714750



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,90 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  private final Phaser _phaser = new Phaser(1);
+  private final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
+    for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
+      _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
+          processBlock(threadIndex);
+        }
+      });
+    }
+
+    IntermediateResultsBlock mergedBlock = mergeBlock();
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
 
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              try {
-                IntermediateResultsBlock resultsBlock =
-                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-                if (isQuerySatisfied(resultsBlock)) {
-                  // Query is satisfied, skip processing the remaining segments
-                  blockingQueue.offer(resultsBlock);
-                  return;
-                } else {
-                  blockingQueue.offer(resultsBlock);
-                }
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+  protected void processBlock(int threadIndex) {
+    try {
+      // Register the thread to the phaser
+      // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+      //       means the query execution has finished, and the main thread has deregistered itself and returned
+      //       the result. Directly return as no execution result will be taken.
+      if (_phaser.register() < 0) {
+        return;
+      }
+
+      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+        try {
+          IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+          if (isQuerySatisfied(resultsBlock)) {
+            // Query is satisfied, skip processing the remaining segments
+            _blockingQueue.offer(resultsBlock);
+            return;
+          } else {
+            _blockingQueue.offer(resultsBlock);
           }
+        } catch (EarlyTerminationException e) {
+          // Early-terminated by interruption (canceled by the main thread)
+          return;
+        } catch (Exception e) {
+          // Caught exception, skip processing the remaining operators
+          LOGGER
+              .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
+                  e);
+          _blockingQueue.offer(new IntermediateResultsBlock(e));
+          return;
         }
-      });
+      }
+    } finally {
+      _phaser.arriveAndDeregister();
     }
+  }
 
+  protected IntermediateResultsBlock mergeBlock() {

Review comment:
       done

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
##########
@@ -54,7 +52,7 @@
  *   - Try to extend BaseCombineOperator to reduce duplicate code

Review comment:
       done

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
##########
@@ -59,11 +59,10 @@
  *   - Try to extend BaseCombineOperator to reduce duplicate code

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6670: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from…

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6670:
URL: https://github.com/apache/incubator-pinot/pull/6670#discussion_r593043930



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,96 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  protected final int _numOperators;
+  protected final int _numThreads;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);

Review comment:
       We should try to unify this behavior of computing the number of threads (essentially tasks that we submit to the pool) in the outer loop.
   
   In the existing code, BaseCombineOperator and it's subclasses derive numThreads from numOperators using CombineOperatorUtils.getNumThreadsForQuery
   
   However, GroupByCombineOperator and GroupByOrderByCombineOperator don't derive numThreads from numOperators. They simply use numOperators in the outer loop. So, as part of the current refactoring to make all  combine operators extend BaseCombineOperator, we should stick to the existing behavior. I think numThreads has to be passed to the base class from the subclasses
   
   Later on, we can see if the numThreads computation can be unified in a separate PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org