You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/03/12 19:01:17 UTC

[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6678: Extends GroupByCombineOperator and GroupByOrderByCombineOperator from BaseCombineOperator

siddharthteotia commented on a change in pull request #6678:
URL: https://github.com/apache/incubator-pinot/pull/6678#discussion_r593383934



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,102 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  protected final int _numOperators;
+  protected int _numThreads;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);

Review comment:
       We should try to unify this behavior of computing the number of threads (essentially tasks that we submit to the pool) in the outer loop.
   
   In the existing code, BaseCombineOperator and it's subclasses derive numThreads from numOperators using CombineOperatorUtils.getNumThreadsForQuery
   
   However, GroupByCombineOperator and GroupByOrderByCombineOperator don't derive numThreads from numOperators. They simply use numOperators in the outer loop. So, as part of the current refactoring to make all combine operators extend BaseCombineOperator, we should stick to the existing behavior. I think numThreads has to be passed to the base class from the subclasses
   
   Later on, we can see if the numThreads computation can be unified in a separate PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org