You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/03/17 06:02:17 UTC

[incubator-pinot] branch master updated: Extends SelectionOrderByCombineOperator from BaseCombineOperator (#6672)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d1cf36a   Extends SelectionOrderByCombineOperator from BaseCombineOperator (#6672)
d1cf36a is described below

commit d1cf36a3cbf8653d7fdf49a3bfc422ee71d1d302
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Tue Mar 16 23:01:55 2021 -0700

     Extends SelectionOrderByCombineOperator from BaseCombineOperator (#6672)
    
    * Extends SelectionOrderByCombineOperator from BaseCombineOperator
    
    * put the try MinMaxValueBasedCombine logic into a separate function
---
 ...xValueBasedSelectionOrderByCombineOperator.java | 297 +++++++++++++++++++++
 .../core/operator/combine/MinMaxValueContext.java  |  36 +++
 .../combine/SelectionOrderByCombineOperator.java   | 247 ++---------------
 3 files changed, 361 insertions(+), 219 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
new file mode 100644
index 0000000..901d6bc
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+
+/**
+ * Optimized combine operator for selection order-by queries.
+ * <p>When the first order-by expression is an identifier (column), skip processing the segments if possible based on
+ * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement.
+ * <ul>
+ *   <li>1. Sort all the segments by the column min/max value</li>
+ *   <li>2. Keep processing segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li>
+ *   <li>3. Skip processing the segments that cannot add values to the final result</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombineOperator {
+  private static final String OPERATOR_NAME = "MinMaxValueBasedSelectionOrderByCombineOperator";
+
+  // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
+  // special IntermediateResultsBlock into the BlockingQueue to awake the main thread
+  private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
+      new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
+          Collections.emptyList());
+
+  private final int _numOperators;
+  private final int numThreads;
+  // Use a BlockingQueue to store the per-segment result
+  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
+  private final AtomicInteger _numOperatorsSkipped = new AtomicInteger();
+  private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();
+  private final int _numRowsToKeep;
+  private final List<MinMaxValueContext> _minMaxValueContexts;
+
+  public MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
+      ExecutorService executorService, long endTimeMs, List<MinMaxValueContext> minMaxValueContexts) {
+    super(operators, queryContext, executorService, endTimeMs);
+    _minMaxValueContexts = minMaxValueContexts;
+    _numOperators = _operators.size();
+    numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p> Execute query on one or more segments in a single thread, and store multiple intermediate result blocks
+   * into BlockingQueue, skip processing the segments if possible based on the column min/max value and keep enough
+   * documents to fulfill the LIMIT and OFFSET requirement.
+   */
+  protected void processSegments(int threadIndex) {
+    List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
+    assert orderByExpressions != null;
+    int numOrderByExpressions = orderByExpressions.size();
+    assert numOrderByExpressions > 0;
+    OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
+    assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER;
+    boolean asc = firstOrderByExpression.isAsc();
+
+    try {
+      // Register the thread to the _phaser
+      // NOTE: If the _phaser is terminated (returning negative value) when trying to register the thread, that
+      //       means the query execution has finished, and the main thread has deregistered itself and returned
+      //       the result. Directly return as no execution result will be taken.
+      if (_phaser.register() < 0) {
+        return;
+      }
+
+      // Keep a boundary value for the thread
+      // NOTE: The thread boundary value can be different from the global boundary value because thread boundary
+      //       value is updated after processing the segment, while global boundary value is updated after the
+      //       segment result is merged.
+      Comparable threadBoundaryValue = null;
+
+      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += numThreads) {
+        // Calculate the boundary value from global boundary and thread boundary
+        Comparable boundaryValue = _globalBoundaryValue.get();
+        if (boundaryValue == null) {
+          boundaryValue = threadBoundaryValue;
+        } else {
+          if (threadBoundaryValue != null) {
+            if (asc) {
+              if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
+                boundaryValue = threadBoundaryValue;
+              }
+            } else {
+              if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
+                boundaryValue = threadBoundaryValue;
+              }
+            }
+          }
+        }
+
+        // Check if the segment can be skipped
+        MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorIndex);
+        if (boundaryValue != null) {
+          if (asc) {
+            // For ascending order, no need to process more segments if the column min value is larger than the
+            // boundary value, or is equal to the boundary value and the there is only one order-by expression
+            if (minMaxValueContext._minValue != null) {
+              int result = minMaxValueContext._minValue.compareTo(boundaryValue);
+              if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
+                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+                _blockingQueue.offer(LAST_RESULTS_BLOCK);
+                return;
+              }
+            }
+          } else {
+            // For descending order, no need to process more segments if the column max value is smaller than the
+            // boundary value, or is equal to the boundary value and the there is only one order-by expression
+            if (minMaxValueContext._maxValue != null) {
+              int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
+              if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
+                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+                _blockingQueue.offer(LAST_RESULTS_BLOCK);
+                return;
+              }
+            }
+          }
+        }
+
+        // Process the segment
+        try {
+          IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
+          PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
+          if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+            // Segment result has enough rows, update the boundary value
+            assert selectionResult.peek() != null;
+            Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
+            if (boundaryValue == null) {
+              boundaryValue = segmentBoundaryValue;
+            } else {
+              if (asc) {
+                if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+                  boundaryValue = segmentBoundaryValue;
+                }
+              } else {
+                if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+                  boundaryValue = segmentBoundaryValue;
+                }
+              }
+            }
+          }
+          threadBoundaryValue = boundaryValue;
+          _blockingQueue.offer(resultsBlock);
+        } catch (EarlyTerminationException e) {
+          // Early-terminated by interruption (canceled by the main thread)
+          return;
+        } catch (Exception e) {
+          // Caught exception, skip processing the remaining operators
+          LOGGER
+              .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
+                  e);
+          _blockingQueue.offer(new IntermediateResultsBlock(e));
+          return;
+        }
+      }
+    } finally {
+      _phaser.arriveAndDeregister();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>Combines intermediate selection result blocks from underlying operators and returns a merged one.
+   * <ul>
+   *   <li>
+   *     Merges multiple intermediate selection result blocks as a merged one.
+   *   </li>
+   *   <li>
+   *     Set all exceptions encountered during execution into the merged result block
+   *   </li>
+   * </ul>
+   */
+  @Override
+  protected IntermediateResultsBlock mergeResultsFromSegments() {
+    IntermediateResultsBlock mergedBlock = null;
+    try {
+      int numBlocksMerged = 0;
+      while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
+        IntermediateResultsBlock blockToMerge =
+            _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        if (blockToMerge == null) {
+          // Query times out, skip merging the remaining results blocks
+          LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
+              _queryContext);
+          mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+              new TimeoutException("Timed out while polling results block")));
+          break;
+        }
+        if (blockToMerge.getProcessingExceptions() != null) {
+          // Caught exception while processing segment, skip merging the remaining results blocks and directly return
+          // the exception
+          mergedBlock = blockToMerge;
+          break;
+        }
+        if (mergedBlock == null) {
+          mergedBlock = blockToMerge;
+        } else {
+          if (blockToMerge != LAST_RESULTS_BLOCK) {
+            mergeResultsBlocks(mergedBlock, blockToMerge);
+          }
+        }
+        numBlocksMerged++;
+
+        // Update the boundary value if enough rows are collected
+        PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+        if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+          assert selectionResult.peek() != null;
+          _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
+      mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future future : _futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      // Deregister the main thread and wait for all threads done
+      _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+    }
+    return mergedBlock;
+  }
+
+  @Override
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+    DataSchema mergedDataSchema = mergedBlock.getDataSchema();
+    DataSchema dataSchemaToMerge = blockToMerge.getDataSchema();
+    assert mergedDataSchema != null && dataSchemaToMerge != null;
+    if (!mergedDataSchema.equals(dataSchemaToMerge)) {
+      String errorMessage = String
+          .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge",
+              mergedDataSchema, dataSchemaToMerge);
+      // NOTE: This is segment level log, so log at debug level to prevent flooding the log.
+      LOGGER.debug(errorMessage);
+      mergedBlock
+          .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
+      return;
+    }
+
+    PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+    Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult();
+    assert mergedRows != null && rowsToMerge != null;
+    SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
new file mode 100644
index 0000000..3f76058
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+
+
+public class MinMaxValueContext {
+  final SelectionOrderByOperator _operator;
+  final Comparable _minValue;
+  final Comparable _maxValue;
+
+  MinMaxValueContext(SelectionOrderByOperator operator, String column) {
+    _operator = operator;
+    DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
+    _minValue = dataSourceMetadata.getMinValue();
+    _maxValue = dataSourceMetadata.getMaxValue();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index f95908f..be899fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -20,57 +20,44 @@ package org.apache.pinot.core.operator.combine;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.PriorityQueue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-import org.apache.pinot.core.query.exception.EarlyTerminationException;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.core.util.trace.TraceRunnable;
 
 
 /**
  * Combine operator for selection order-by queries.
- * <p>When the first order-by expression is an identifier (column), skip processing the segments if possible based on
- * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement.
- * <ul>
- *   <li>1. Sort all the segments by the column min/max value</li>
- *   <li>2. Keep processing segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li>
- *   <li>3. Skip processing the segments that cannot add values to the final result</li>
- * </ul>
+ * <p>When the first order-by expression is an identifier (column), try to use
+ * {@link org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator} first, which will
+ * skip processing some segments based on the column min/max value. Otherwise fall back to the default combine
+ * (process all segments).
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class SelectionOrderByCombineOperator extends BaseCombineOperator {
   private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator";
 
-  // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
-  // special IntermediateResultsBlock into the BlockingQueue to awake the main thread
-  private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
-      new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
-          Collections.emptyList());
-
+  private final List<Operator> _operators;
+  private final QueryContext _queryContext;
+  private final ExecutorService _executorService;
+  private final long _endTimeMs;
   private final int _numRowsToKeep;
 
   public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
       ExecutorService executorService, long endTimeMs) {
     super(operators, queryContext, executorService, endTimeMs);
+    _operators = operators;
+    _queryContext = queryContext;
+    _executorService = executorService;
+    _endTimeMs = endTimeMs;
     _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
   }
 
@@ -79,20 +66,29 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
     return OPERATOR_NAME;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * <p> Execute query on one or more segments in a single thread, and store multiple intermediate result blocks
+   * into BlockingQueue. Try to use
+   * {@link org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator} first, which
+   * will skip processing some segments based on the column min/max value. Otherwise fall back to the default combine
+   * (process all segments).
+   */
   @Override
   protected IntermediateResultsBlock getNextBlock() {
     List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
     assert orderByExpressions != null;
     if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) {
-      return minMaxValueBasedCombine();
+      return tryMinMaxValueBasedCombine(orderByExpressions);
     } else {
+      // Fall back to the default combine (process all segments) when segments have different data types for the first
+      // order-by column
       return super.getNextBlock();
     }
   }
 
-  private IntermediateResultsBlock minMaxValueBasedCombine() {
-    List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
-    assert orderByExpressions != null;
+  private IntermediateResultsBlock tryMinMaxValueBasedCombine(List<OrderByExpressionContext> orderByExpressions) {
     int numOrderByExpressions = orderByExpressions.size();
     assert numOrderByExpressions > 0;
     OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
@@ -132,201 +128,14 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
         });
       }
     } catch (Exception e) {
-      // Fall back to the default combine (process all segments) when segments have different data types for the first
-      // order-by column
+      // Fall back to the default combine (process all segments) if there are any exceptions.
       LOGGER.warn("Segments have different data types for the first order-by column: {}, using the default combine",
           firstOrderByColumn);
       return super.getNextBlock();
     }
 
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-    AtomicReference<Comparable> globalBoundaryValue = new AtomicReference<>();
-
-    // Use a BlockingQueue to store the per-segment result
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
-    // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
-    AtomicInteger numOperatorsSkipped = new AtomicInteger();
-    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
-        @Override
-        public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
-            //       means the query execution has finished, and the main thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
-
-            // Keep a boundary value for the thread
-            // NOTE: The thread boundary value can be different from the global boundary value because thread boundary
-            //       value is updated after processing the segment, while global boundary value is updated after the
-            //       segment result is merged.
-            Comparable threadBoundaryValue = null;
-
-            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
-              // Calculate the boundary value from global boundary and thread boundary
-              Comparable boundaryValue = globalBoundaryValue.get();
-              if (boundaryValue == null) {
-                boundaryValue = threadBoundaryValue;
-              } else {
-                if (threadBoundaryValue != null) {
-                  if (asc) {
-                    if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
-                      boundaryValue = threadBoundaryValue;
-                    }
-                  } else {
-                    if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
-                      boundaryValue = threadBoundaryValue;
-                    }
-                  }
-                }
-              }
-
-              // Check if the segment can be skipped
-              MinMaxValueContext minMaxValueContext = minMaxValueContexts.get(operatorIndex);
-              if (boundaryValue != null) {
-                if (asc) {
-                  // For ascending order, no need to process more segments if the column min value is larger than the
-                  // boundary value, or is equal to the boundary value and the there is only one order-by expression
-                  if (minMaxValueContext._minValue != null) {
-                    int result = minMaxValueContext._minValue.compareTo(boundaryValue);
-                    if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
-                      numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
-                      blockingQueue.offer(LAST_RESULTS_BLOCK);
-                      return;
-                    }
-                  }
-                } else {
-                  // For descending order, no need to process more segments if the column max value is smaller than the
-                  // boundary value, or is equal to the boundary value and the there is only one order-by expression
-                  if (minMaxValueContext._maxValue != null) {
-                    int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
-                    if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
-                      numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
-                      blockingQueue.offer(LAST_RESULTS_BLOCK);
-                      return;
-                    }
-                  }
-                }
-              }
-
-              // Process the segment
-              try {
-                IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
-                PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
-                if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
-                  // Segment result has enough rows, update the boundary value
-                  assert selectionResult.peek() != null;
-                  Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
-                  if (boundaryValue == null) {
-                    boundaryValue = segmentBoundaryValue;
-                  } else {
-                    if (asc) {
-                      if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
-                        boundaryValue = segmentBoundaryValue;
-                      }
-                    } else {
-                      if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
-                        boundaryValue = segmentBoundaryValue;
-                      }
-                    }
-                  }
-                }
-                threadBoundaryValue = boundaryValue;
-                blockingQueue.offer(resultsBlock);
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
-          }
-        }
-      });
-    }
-
-    IntermediateResultsBlock mergedBlock = null;
-    try {
-      int numBlocksMerged = 0;
-      while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) {
-        IntermediateResultsBlock blockToMerge =
-            blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-        if (blockToMerge == null) {
-          // Query times out, skip merging the remaining results blocks
-          LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
-              _queryContext);
-          mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
-              new TimeoutException("Timed out while polling results block")));
-          break;
-        }
-        if (blockToMerge.getProcessingExceptions() != null) {
-          // Caught exception while processing segment, skip merging the remaining results blocks and directly return
-          // the exception
-          mergedBlock = blockToMerge;
-          break;
-        }
-        if (mergedBlock == null) {
-          mergedBlock = blockToMerge;
-        } else {
-          if (blockToMerge != LAST_RESULTS_BLOCK) {
-            mergeResultsBlocks(mergedBlock, blockToMerge);
-          }
-        }
-        numBlocksMerged++;
-
-        // Update the boundary value if enough rows are collected
-        PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
-        if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
-          assert selectionResult.peek() != null;
-          globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
-        }
-      }
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
-      mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-      // Deregister the main thread and wait for all threads done
-      phaser.awaitAdvance(phaser.arriveAndDeregister());
-    }
-
-    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
-    return mergedBlock;
-  }
-
-  private static class MinMaxValueContext {
-    final SelectionOrderByOperator _operator;
-    final Comparable _minValue;
-    final Comparable _maxValue;
-
-    MinMaxValueContext(SelectionOrderByOperator operator, String column) {
-      _operator = operator;
-      DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
-      _minValue = dataSourceMetadata.getMinValue();
-      _maxValue = dataSourceMetadata.getMaxValue();
-    }
+    return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, _executorService, _endTimeMs,
+        minMaxValueContexts).getNextBlock();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org