You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/03/11 23:13:14 UTC

[GitHub] [incubator-pinot] mqliang opened a new pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

mqliang opened a new pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672


   ## Description
   
   Only the third commit need to be reviewed, the first two is the same as https://github.com/apache/incubator-pinot/pull/6670
   
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release.
   
   If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text
   
   ## Documentation
   If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang commented on a change in pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
mqliang commented on a change in pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#discussion_r593457823



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,100 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
+    for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
+      _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
+          processSegments(threadIndex);
+        }
+      });
+    }
 
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              try {
-                IntermediateResultsBlock resultsBlock =
-                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-                if (isQuerySatisfied(resultsBlock)) {
-                  // Query is satisfied, skip processing the remaining segments
-                  blockingQueue.offer(resultsBlock);
-                  return;
-                } else {
-                  blockingQueue.offer(resultsBlock);
-                }
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+    IntermediateResultsBlock mergedBlock = mergeResultsFromSegments();
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
+
+  /**
+   * processSegments will execute query on one or more segments in a single thread.
+   */
+  protected void processSegments(int threadIndex) {
+    try {
+      // Register the thread to the phaser
+      // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+      //       means the query execution has finished, and the main thread has deregistered itself and returned
+      //       the result. Directly return as no execution result will be taken.
+      if (_phaser.register() < 0) {
+        return;
+      }
+
+      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+        try {
+          IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+          if (isQuerySatisfied(resultsBlock)) {
+            // Query is satisfied, skip processing the remaining segments
+            _blockingQueue.offer(resultsBlock);
+            return;
+          } else {
+            _blockingQueue.offer(resultsBlock);
           }
+        } catch (EarlyTerminationException e) {
+          // Early-terminated by interruption (canceled by the main thread)
+          return;
+        } catch (Exception e) {
+          // Caught exception, skip processing the remaining operators
+          LOGGER
+              .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
+                  e);
+          _blockingQueue.offer(new IntermediateResultsBlock(e));
+          return;
         }
-      });
+      }
+    } finally {
+      _phaser.arriveAndDeregister();
     }
+  }
 
+  /**
+<<<<<<< HEAD

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#discussion_r595592627



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
##########
@@ -79,253 +66,70 @@ public String getOperatorName() {
     return OPERATOR_NAME;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * <p> Execute query on one or more segments in a single thread, and store multiple intermediate result blocks
+   * into BlockingQueue. Try to use
+   * {@link org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator} first, which
+   * will skip processing some segments based on the column min/max value. Otherwise fall back to the default combine
+   * (process all segments).
+   */
   @Override
   protected IntermediateResultsBlock getNextBlock() {
     List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
     assert orderByExpressions != null;
     if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) {
-      return minMaxValueBasedCombine();
-    } else {
-      return super.getNextBlock();
-    }
-  }
-
-  private IntermediateResultsBlock minMaxValueBasedCombine() {
-    List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
-    assert orderByExpressions != null;
-    int numOrderByExpressions = orderByExpressions.size();
-    assert numOrderByExpressions > 0;
-    OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
-    assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER;
-    String firstOrderByColumn = firstOrderByExpression.getExpression().getIdentifier();
-    boolean asc = firstOrderByExpression.isAsc();
-
-    int numOperators = _operators.size();
-    List<MinMaxValueContext> minMaxValueContexts = new ArrayList<>(numOperators);
-    for (Operator operator : _operators) {
-      minMaxValueContexts.add(new MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn));
-    }
-    try {
-      if (asc) {
-        // For ascending order, sort on column min value in ascending order
-        minMaxValueContexts.sort((o1, o2) -> {
-          // Put segments without column min value in the front because we always need to process them
-          if (o1._minValue == null) {
-            return o2._minValue == null ? 0 : -1;
-          }
-          if (o2._minValue == null) {
-            return 1;
-          }
-          return o1._minValue.compareTo(o2._minValue);
-        });
-      } else {
-        // For descending order, sort on column max value in descending order
-        minMaxValueContexts.sort((o1, o2) -> {
-          // Put segments without column max value in the front because we always need to process them
-          if (o1._maxValue == null) {
-            return o2._maxValue == null ? 0 : -1;
-          }
-          if (o2._maxValue == null) {
-            return 1;
-          }
-          return o2._maxValue.compareTo(o1._maxValue);
-        });
+      int numOrderByExpressions = orderByExpressions.size();

Review comment:
       (nit) in order for this to be cleaner, I suggest moving all the code under this if block in a separate function something like tryMinMaxValueBasedCombine (something similar to old code). This will move the else part (currently at line 131 all the way up)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang commented on pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
mqliang commented on pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#issuecomment-800730885


   integration test stuck ~2 hours, close and re-open to trigger a re-run.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#discussion_r592884346



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,100 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];

Review comment:
       In the current implementation of GroupByOrderByCombineOperator and GroupByCombineOperator, the number of threads spawned is equal to numOperators as opposed to the return value from getNumThreadsForQuery

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,100 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
+    for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
+      _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
+          processSegments(threadIndex);
+        }
+      });
+    }
 
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              try {
-                IntermediateResultsBlock resultsBlock =
-                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-                if (isQuerySatisfied(resultsBlock)) {
-                  // Query is satisfied, skip processing the remaining segments
-                  blockingQueue.offer(resultsBlock);
-                  return;
-                } else {
-                  blockingQueue.offer(resultsBlock);
-                }
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+    IntermediateResultsBlock mergedBlock = mergeResultsFromSegments();
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
+
+  /**
+   * processSegments will execute query on one or more segments in a single thread.
+   */
+  protected void processSegments(int threadIndex) {
+    try {
+      // Register the thread to the phaser
+      // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+      //       means the query execution has finished, and the main thread has deregistered itself and returned
+      //       the result. Directly return as no execution result will be taken.
+      if (_phaser.register() < 0) {
+        return;
+      }
+
+      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+        try {
+          IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+          if (isQuerySatisfied(resultsBlock)) {
+            // Query is satisfied, skip processing the remaining segments
+            _blockingQueue.offer(resultsBlock);
+            return;
+          } else {
+            _blockingQueue.offer(resultsBlock);
           }
+        } catch (EarlyTerminationException e) {
+          // Early-terminated by interruption (canceled by the main thread)
+          return;
+        } catch (Exception e) {
+          // Caught exception, skip processing the remaining operators
+          LOGGER
+              .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
+                  e);
+          _blockingQueue.offer(new IntermediateResultsBlock(e));
+          return;
         }
-      });
+      }
+    } finally {
+      _phaser.arriveAndDeregister();
     }
+  }
 
+  /**
+<<<<<<< HEAD

Review comment:
       ??




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#discussion_r593525978



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+
+
+public class MinMaxValueContext {

Review comment:
       May be consider making this as inner static class if this is only used by SelectionOrderByCombineOperator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang commented on a change in pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
mqliang commented on a change in pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#discussion_r593530030



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+
+
+public class MinMaxValueContext {

Review comment:
       It was used by SelectionOrderByCombineOperator and MinMaxValueBasedSelectionOrderByCombineOperator (optimized version of SelectionOrderByCombineOperator)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang commented on a change in pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
mqliang commented on a change in pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672#discussion_r593458091



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
##########
@@ -52,77 +52,100 @@
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
 
+  private final int _numOperators;
+  private final int _numThreads;
+  // Use a _blockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+  // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+  // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+  // behavior (even JVM crash) when processing queries against it.
+  protected final Phaser _phaser = new Phaser(1);
+  protected final Future[] _futures;
+
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
+    _numOperators = _operators.size();
+    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _futures = new Future[_numThreads];

Review comment:
       Thanks for pointing out this. Have fixed it in https://github.com/apache/incubator-pinot/pull/6678. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia merged pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
siddharthteotia merged pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mqliang closed pull request #6672: Extends SelectionOrderByCombineOperator from BaseCombineOperator

Posted by GitBox <gi...@apache.org>.
mqliang closed pull request #6672:
URL: https://github.com/apache/incubator-pinot/pull/6672


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org