You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/07/23 17:35:06 UTC

[incubator-pinot] branch master updated: Early termination for combining selection order-by results (#5686)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 174eb4b  Early termination for combining selection order-by results (#5686)
174eb4b is described below

commit 174eb4b99c1ccc0645e1ba47eb3f26a1c816efb5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jul 23 10:34:56 2020 -0700

    Early termination for combining selection order-by results (#5686)
    
    Split `CombineOperator` into 3 different combine operators:
    - `SelectionOnlyCombineOperator`
    - `SelectionOrderByCombineOperator`
    - `AggregationOnlyOrderByCombineOperator`
    (For aggregation-group by, rename the combine operator to `GroupByCombineOperator` and `GroupByOrderByCombineOperator`)
    
    Re-implement the combine operator logic in `BaseCombineOperator` for the early-termination enhancement:
    - Worker threads no longer perform the result merge, but insert the results block from each segment to the blocking queue
    - Worker threads can early-terminate if the results block for one segment can satisfy the query
    - Main thread will merge the results blocks from the worker threads, and early-terminate if the merged results block can satisfy the query
    
    For `SelectionOnlyCombineOperator`:
    - Only process 1 segment for `LIMIT 0`
    - Early termination when enough rows are collected
    
    For `SelectionOrderByCombineOperator`:
    - If the first order-by expression is an identifier (column), sort segments by the column min/max value and early terminate when result for the segment must not be in the final result.
---
 .../pinot/core/operator/CombineOperator.java       | 226 -------------
 .../pinot/core/operator/ExecutionStatistics.java   |  46 +--
 .../operator/blocks/IntermediateResultsBlock.java  |  52 ++-
 .../combine/AggregationOnlyCombineOperator.java    |  58 ++++
 .../core/operator/combine/BaseCombineOperator.java | 185 +++++++++++
 .../operator/combine/CombineOperatorUtils.java     |  74 +++++
 .../GroupByCombineOperator.java}                   |  27 +-
 .../GroupByOrderByCombineOperator.java}            |  30 +-
 .../combine/SelectionOnlyCombineOperator.java      |  95 ++++++
 .../combine/SelectionOrderByCombineOperator.java   | 356 +++++++++++++++++++++
 .../operator/query/SelectionOrderByOperator.java   |  18 +-
 .../apache/pinot/core/plan/CombinePlanNode.java    |  36 ++-
 .../pinot/core/query/reduce/CombineService.java    | 145 ---------
 .../{ => combine}/CombineSlowOperatorsTest.java    |  36 ++-
 .../combine/SelectionCombineOperatorTest.java      | 243 ++++++++++++++
 .../pinot/queries/BaseSingleValueQueriesTest.java  |  12 +-
 .../queries/SelectionOnlyEarlyTerminationTest.java | 124 -------
 17 files changed, 1139 insertions(+), 624 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
deleted file mode 100644
index accd0f0..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.query.exception.EarlyTerminationException;
-import org.apache.pinot.core.query.reduce.CombineService;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.util.trace.TraceCallable;
-import org.apache.pinot.core.util.trace.TraceRunnable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>CombineOperator</code> class is the operator to combine selection results and aggregation only results.
- */
-@SuppressWarnings("rawtypes")
-public class CombineOperator extends BaseOperator<IntermediateResultsBlock> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(CombineOperator.class);
-  private static final String OPERATOR_NAME = "CombineOperator";
-
-  // Use at most 10 or half of the processors threads for each query.
-  // If there are less than 2 processors, use 1 thread.
-  // Runtime.getRuntime().availableProcessors() may return value < 2 in container based environment, e.g. Kubernetes.
-  public static final int MAX_NUM_THREADS_PER_QUERY =
-      Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2));
-
-  private final List<Operator> _operators;
-  private final QueryContext _queryContext;
-  private final ExecutorService _executorService;
-  private final long _timeOutMs;
-
-  public CombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
-      long timeOutMs) {
-    _operators = operators;
-    _queryContext = queryContext;
-    _executorService = executorService;
-    _timeOutMs = timeOutMs;
-  }
-
-  @Override
-  protected IntermediateResultsBlock getNextBlock() {
-    long startTimeMs = System.currentTimeMillis();
-    long endTimeMs = startTimeMs + _timeOutMs;
-    int numOperators = _operators.size();
-    // Try to use all MAX_NUM_THREADS_PER_QUERY threads for the query, but ensure each thread has at least one operator
-    int numThreads = Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY);
-
-    // We use a BlockingQueue to store the results for each operator group, and track if all operator groups are
-    // finished by the query timeout, and cancel the unfinished futures (try to interrupt the execution if it already
-    // started).
-    // Besides the BlockingQueue, we also use a Phaser to ensure all the Futures are done (not scheduled, finished or
-    // interrupted) before the main thread returns. We need to ensure no execution left before the main thread returning
-    // because the main thread holds the reference to the segments, and if the segments are deleted/refreshed, the
-    // segments can be released after the main thread returns, which would lead to undefined behavior (even JVM crash)
-    // when executing queries against them.
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numThreads);
-    Phaser phaser = new Phaser(1);
-
-    // Submit operator group execution jobs
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      int index = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
-        @Override
-        public void runJob() {
-          try {
-            // Register the thread to the phaser.
-            // If the phaser is terminated (returning negative value) when trying to register the thread, that means the
-            // query execution has timed out, and the main thread has deregistered itself and returned the result.
-            // Directly return as no execution result will be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
-
-            IntermediateResultsBlock mergedBlock = (IntermediateResultsBlock) _operators.get(index).nextBlock();
-            for (int i = index + numThreads; i < numOperators; i += numThreads) {
-              if (isQuerySatisfied(_queryContext, mergedBlock)) {
-                break;
-              }
-              IntermediateResultsBlock blockToMerge = (IntermediateResultsBlock) _operators.get(i).nextBlock();
-              try {
-                CombineService.mergeTwoBlocks(_queryContext, mergedBlock, blockToMerge);
-              } catch (Exception e) {
-                LOGGER.error("Caught exception while merging two blocks (step 1).", e);
-                mergedBlock
-                    .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e));
-              }
-            }
-            blockingQueue.offer(mergedBlock);
-          } catch (EarlyTerminationException e) {
-            // Early-terminated because query times out or is already satisfied
-          } catch (Exception e) {
-            LOGGER.error("Caught exception while executing query.", e);
-            blockingQueue.offer(new IntermediateResultsBlock(e));
-          } finally {
-            phaser.arriveAndDeregister();
-          }
-        }
-      });
-    }
-
-    // Submit operator groups merge job
-    Future<IntermediateResultsBlock> mergedBlockFuture =
-        _executorService.submit(new TraceCallable<IntermediateResultsBlock>() {
-          @Override
-          public IntermediateResultsBlock callJob()
-              throws Exception {
-            IntermediateResultsBlock mergedBlock =
-                blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-            if (mergedBlock == null) {
-              throw new TimeoutException("Timed out while polling result from first thread");
-            }
-            int numMergedBlocks = 1;
-            while (numMergedBlocks < numThreads) {
-              if (isQuerySatisfied(_queryContext, mergedBlock)) {
-                break;
-              }
-              IntermediateResultsBlock blockToMerge =
-                  blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-              if (blockToMerge == null) {
-                throw new TimeoutException("Timed out while polling result from thread: " + numMergedBlocks);
-              }
-              try {
-                CombineService.mergeTwoBlocks(_queryContext, mergedBlock, blockToMerge);
-              } catch (Exception e) {
-                LOGGER.error("Caught exception while merging two blocks (step 2).", e);
-                mergedBlock
-                    .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e));
-              }
-              numMergedBlocks++;
-            }
-            return mergedBlock;
-          }
-        });
-
-    // Get merge results.
-    IntermediateResultsBlock mergedBlock;
-    try {
-      mergedBlock = mergedBlockFuture.get(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.error("Caught InterruptedException. (queryContext = {})", _queryContext, e);
-      mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.FUTURE_CALL_ERROR, e));
-    } catch (ExecutionException e) {
-      LOGGER.error("Caught ExecutionException. (queryContext = {})", _queryContext, e);
-      mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e));
-    } catch (TimeoutException e) {
-      LOGGER.error("Caught TimeoutException. (queryContext = {})", _queryContext, e);
-      mergedBlockFuture.cancel(true);
-      mergedBlock =
-          new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, e));
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-      // Deregister the main thread and wait for all threads done
-      phaser.awaitAdvance(phaser.arriveAndDeregister());
-    }
-
-    // Update execution statistics.
-    ExecutionStatistics executionStatistics = new ExecutionStatistics();
-    for (Operator operator : _operators) {
-      executionStatistics.merge(operator.getExecutionStatistics());
-    }
-    mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
-    mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
-    mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter());
-    mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs());
-    mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
-    mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
-
-    return mergedBlock;
-  }
-
-  /**
-   * Returns {@code true} if the query is already satisfied with the IntermediateResultsBlock so that there is no need
-   * to process more segments, {@code false} otherwise.
-   * <p>For selection-only query, the query is satisfied when enough records are gathered.
-   */
-  private boolean isQuerySatisfied(QueryContext queryContext, IntermediateResultsBlock mergedBlock) {
-    if (!QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getOrderByExpressions() == null) {
-      // Selection-only
-      Collection<Object[]> selectionResult = mergedBlock.getSelectionResult();
-      return selectionResult != null && selectionResult.size() >= queryContext.getLimit();
-    }
-    return false;
-  }
-
-  @Override
-  public String getOperatorName() {
-    return OPERATOR_NAME;
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java
index 1c99d24..04c2665 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java
@@ -23,7 +23,7 @@ package org.apache.pinot.core.operator;
  */
 public class ExecutionStatistics {
   // The number of documents scanned post filtering.
-  private long _numDocsScanned;
+  private final long _numDocsScanned;
   // The number of entries (single value entry contains 1 value, multi-value entry may contain multiple values) scanned
   // in the filtering phase of the query execution: could be larger than the total scanned doc num because of multiple
   // filtering predicates and multi-value entry.
@@ -31,15 +31,13 @@ public class ExecutionStatistics {
   // _numEntriesScannedInFilter counts values in a multi-value entry multiple times whereas in
   // _numEntriesScannedPostFilter counts all values in a multi-value entry only once. We can add a new stats
   // _numValuesScannedInFilter to replace _numEntriesScannedInFilter.
-  private long _numEntriesScannedInFilter;
+  private final long _numEntriesScannedInFilter;
   // Equal to numDocsScanned * numberProjectedColumns.
-  private long _numEntriesScannedPostFilter;
-  private long _numTotalDocs;
-  private long _numSegmentsProcessed;
-  private long _numSegmentsMatched;
+  private final long _numEntriesScannedPostFilter;
 
-  public ExecutionStatistics() {
-  }
+  // TODO: Remove _numTotalDocs because it is not execution stats, and it is not set from the operators because they
+  //       don't cover the pruned segments
+  private final long _numTotalDocs;
 
   public ExecutionStatistics(long numDocsScanned, long numEntriesScannedInFilter, long numEntriesScannedPostFilter,
       long numTotalDocs) {
@@ -47,8 +45,6 @@ public class ExecutionStatistics {
     _numEntriesScannedInFilter = numEntriesScannedInFilter;
     _numEntriesScannedPostFilter = numEntriesScannedPostFilter;
     _numTotalDocs = numTotalDocs;
-    _numSegmentsProcessed = 1;
-    _numSegmentsMatched = (numDocsScanned == 0) ? 0 : 1;
   }
 
   public long getNumDocsScanned() {
@@ -66,34 +62,4 @@ public class ExecutionStatistics {
   public long getNumTotalDocs() {
     return _numTotalDocs;
   }
-
-  public long getNumSegmentsProcessed() {
-    return _numSegmentsProcessed;
-  }
-
-  public long getNumSegmentsMatched() {
-    return _numSegmentsMatched;
-  }
-
-  /**
-   * Merge another execution statistics into the current one.
-   *
-   * @param executionStatisticsToMerge execution statistics to merge.
-   */
-  public void merge(ExecutionStatistics executionStatisticsToMerge) {
-    _numDocsScanned += executionStatisticsToMerge._numDocsScanned;
-    _numEntriesScannedInFilter += executionStatisticsToMerge._numEntriesScannedInFilter;
-    _numEntriesScannedPostFilter += executionStatisticsToMerge._numEntriesScannedPostFilter;
-    _numTotalDocs += executionStatisticsToMerge._numTotalDocs;
-    _numSegmentsProcessed += executionStatisticsToMerge._numSegmentsProcessed;
-    _numSegmentsMatched += executionStatisticsToMerge._numSegmentsMatched;
-  }
-
-  @Override
-  public String toString() {
-    return "Execution Statistics:" + "\n  numDocsScanned: " + _numDocsScanned + "\n  numEntriesScannedInFilter: "
-        + _numEntriesScannedInFilter + "\n  numEntriesScannedPostFilter: " + _numEntriesScannedPostFilter
-        + "\n  numTotalDocs: " + _numTotalDocs + "\n  numSegmentsProcessed: " + _numSegmentsProcessed
-        + "\n  numSegmentsMatched: " + _numSegmentsMatched;
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 63efe7c..d0eb2e2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -60,8 +61,8 @@ public class IntermediateResultsBlock implements Block {
   private long _numEntriesScannedInFilter;
   private long _numEntriesScannedPostFilter;
   private long _numTotalDocs;
-  private long _numSegmentsProcessed;
-  private long _numSegmentsMatched;
+  private int _numSegmentsProcessed;
+  private int _numSegmentsMatched;
   private boolean _numGroupsLimitReached;
 
   private Table _table;
@@ -198,19 +199,11 @@ public class IntermediateResultsBlock implements Block {
     _numEntriesScannedPostFilter = numEntriesScannedPostFilter;
   }
 
-  public long getNumSegmentsProcessed() {
-    return _numSegmentsProcessed;
-  }
-
-  public void setNumSegmentsProcessed(long numSegmentsProcessed) {
+  public void setNumSegmentsProcessed(int numSegmentsProcessed) {
     _numSegmentsProcessed = numSegmentsProcessed;
   }
 
-  public long getNumSegmentsMatched() {
-    return _numSegmentsMatched;
-  }
-
-  public void setNumSegmentsMatched(long numSegmentsMatched) {
+  public void setNumSegmentsMatched(int numSegmentsMatched) {
     _numSegmentsMatched = numSegmentsMatched;
   }
 
@@ -222,6 +215,41 @@ public class IntermediateResultsBlock implements Block {
     _numGroupsLimitReached = numGroupsLimitReached;
   }
 
+  @VisibleForTesting
+  public long getNumDocsScanned() {
+    return _numDocsScanned;
+  }
+
+  @VisibleForTesting
+  public long getNumEntriesScannedInFilter() {
+    return _numEntriesScannedInFilter;
+  }
+
+  @VisibleForTesting
+  public long getNumEntriesScannedPostFilter() {
+    return _numEntriesScannedPostFilter;
+  }
+
+  @VisibleForTesting
+  public int getNumSegmentsProcessed() {
+    return _numSegmentsProcessed;
+  }
+
+  @VisibleForTesting
+  public int getNumSegmentsMatched() {
+    return _numSegmentsMatched;
+  }
+
+  @VisibleForTesting
+  public long getNumTotalDocs() {
+    return _numTotalDocs;
+  }
+
+  @VisibleForTesting
+  public boolean isNumGroupsLimitReached() {
+    return _numGroupsLimitReached;
+  }
+
   public DataTable getDataTable()
       throws Exception {
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
new file mode 100644
index 0000000..83a474e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Combine operator for aggregation only queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class AggregationOnlyCombineOperator extends BaseCombineOperator {
+  private static final String OPERATOR_NAME = "AggregationOnlyCombineOperator";
+
+  public AggregationOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
+      ExecutorService executorService, long timeOutMs) {
+    super(operators, queryContext, executorService, timeOutMs);
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+    AggregationFunction[] aggregationFunctions = mergedBlock.getAggregationFunctions();
+    List<Object> mergedResults = mergedBlock.getAggregationResult();
+    List<Object> resultsToMerge = blockToMerge.getAggregationResult();
+    assert aggregationFunctions != null && mergedResults != null && resultsToMerge != null;
+
+    int numAggregationFunctions = aggregationFunctions.length;
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      mergedResults.set(i, aggregationFunctions[i].merge(mergedResults.get(i), resultsToMerge.get(i)));
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
new file mode 100644
index 0000000..2d10a35
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base implementation of the combine operator.
+ * <p>Combine operator uses multiple worker threads to process segments in parallel, and uses the main thread to merge
+ * the results blocks from the processed segments. It can early-terminate the query to save the system resources if it
+ * detects that the merged results can already satisfy the query, or the query is already errored out or timed out.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class BaseCombineOperator extends BaseOperator<IntermediateResultsBlock> {
+  protected static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class);
+
+  protected final List<Operator> _operators;
+  protected final QueryContext _queryContext;
+  protected final ExecutorService _executorService;
+  protected final long _timeOutMs;
+
+  public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
+      long timeOutMs) {
+    _operators = operators;
+    _queryContext = queryContext;
+    _executorService = executorService;
+    _timeOutMs = timeOutMs;
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    long startTimeMs = System.currentTimeMillis();
+    long endTimeMs = startTimeMs + _timeOutMs;
+    int numOperators = _operators.size();
+    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
+
+    // Use a BlockingQueue to store the per-segment result
+    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
+    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+    // behavior (even JVM crash) when processing queries against it.
+    Phaser phaser = new Phaser(1);
+
+    Future[] futures = new Future[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      int threadIndex = i;
+      futures[i] = _executorService.submit(new TraceRunnable() {
+        @Override
+        public void runJob() {
+          try {
+            // Register the thread to the phaser
+            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+            //       means the query execution has finished, and the main thread has deregistered itself and returned
+            //       the result. Directly return as no execution result will be taken.
+            if (phaser.register() < 0) {
+              return;
+            }
+
+            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
+              try {
+                IntermediateResultsBlock resultsBlock =
+                    (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+                if (isQuerySatisfied(resultsBlock)) {
+                  // Query is satisfied, skip processing the remaining segments
+                  blockingQueue.offer(resultsBlock);
+                  return;
+                } else {
+                  blockingQueue.offer(resultsBlock);
+                }
+              } catch (EarlyTerminationException e) {
+                // Early-terminated by interruption (canceled by the main thread)
+                return;
+              } catch (Exception e) {
+                // Caught exception, skip processing the remaining operators
+                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
+                    _queryContext, e);
+                blockingQueue.offer(new IntermediateResultsBlock(e));
+                return;
+              }
+            }
+          } finally {
+            phaser.arriveAndDeregister();
+          }
+        }
+      });
+    }
+
+    IntermediateResultsBlock mergedBlock = null;
+    try {
+      int numBlocksMerged = 0;
+      while (numBlocksMerged < numOperators) {
+        IntermediateResultsBlock blockToMerge =
+            blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        if (blockToMerge == null) {
+          // Query times out, skip merging the remaining results blocks
+          LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
+              _queryContext);
+          mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+              new TimeoutException("Timed out while polling results block")));
+          break;
+        }
+        if (blockToMerge.getProcessingExceptions() != null) {
+          // Caught exception while processing segment, skip merging the remaining results blocks and directly return
+          // the exception
+          mergedBlock = blockToMerge;
+          break;
+        }
+        if (mergedBlock == null) {
+          mergedBlock = blockToMerge;
+        } else {
+          mergeResultsBlocks(mergedBlock, blockToMerge);
+        }
+        numBlocksMerged++;
+        if (isQuerySatisfied(mergedBlock)) {
+          // Query is satisfied, skip merging the remaining results blocks
+          break;
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
+      mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      // Deregister the main thread and wait for all threads done
+      phaser.awaitAdvance(phaser.arriveAndDeregister());
+    }
+
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
+
+  /**
+   * Can be overridden for early termination.
+   */
+  protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) {
+    return false;
+  }
+
+  /**
+   * Merge an IntermediateResultsBlock into the main IntermediateResultsBlock.
+   * <p>NOTE: {@code blockToMerge} should contain the result for a segment without any exception. The errored segment
+   * result is already handled.
+   */
+  protected abstract void mergeResultsBlocks(IntermediateResultsBlock mergedBlock,
+      IntermediateResultsBlock blockToMerge);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
new file mode 100644
index 0000000..fd7737f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.List;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+
+
+@SuppressWarnings("rawtypes")
+public class CombineOperatorUtils {
+  private CombineOperatorUtils() {
+  }
+
+  /**
+   * Use at most 10 or half of the processors threads for each query. If there are less than 2 processors, use 1 thread.
+   * <p>NOTE: Runtime.getRuntime().availableProcessors() may return value < 2 in container based environment, e.g.
+   *          Kubernetes.
+   */
+  public static final int MAX_NUM_THREADS_PER_QUERY =
+      Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2));
+
+  /**
+   * Returns the number of threads used to execute the query in parallel.
+   */
+  public static int getNumThreadsForQuery(int numOperators) {
+    return Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY);
+  }
+
+  /**
+   * Sets the execution statistics into the results block.
+   */
+  public static void setExecutionStatistics(IntermediateResultsBlock resultsBlock, List<Operator> operators) {
+    int numSegmentsProcessed = operators.size();
+    int numSegmentsMatched = 0;
+    long numDocsScanned = 0;
+    long numEntriesScannedInFilter = 0;
+    long numEntriesScannedPostFilter = 0;
+    long numTotalDocs = 0;
+    for (Operator operator : operators) {
+      ExecutionStatistics executionStatistics = operator.getExecutionStatistics();
+      if (executionStatistics.getNumDocsScanned() > 0) {
+        numSegmentsMatched++;
+      }
+      numDocsScanned += executionStatistics.getNumDocsScanned();
+      numEntriesScannedInFilter += executionStatistics.getNumEntriesScannedInFilter();
+      numEntriesScannedPostFilter += executionStatistics.getNumEntriesScannedPostFilter();
+      numTotalDocs += executionStatistics.getNumTotalDocs();
+    }
+    resultsBlock.setNumSegmentsProcessed(numSegmentsProcessed);
+    resultsBlock.setNumSegmentsMatched(numSegmentsMatched);
+    resultsBlock.setNumDocsScanned(numDocsScanned);
+    resultsBlock.setNumEntriesScannedInFilter(numEntriesScannedInFilter);
+    resultsBlock.setNumEntriesScannedPostFilter(numEntriesScannedPostFilter);
+    resultsBlock.setNumTotalDocs(numTotalDocs);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
similarity index 90%
rename from pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 5b6b9e6..476b459 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.combine;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
@@ -48,12 +49,15 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * The <code>CombineGroupByOperator</code> class is the operator to combine aggregation group-by results.
+ * Combine operator for aggregation group-by queries with PQL semantic.
+ * TODO:
+ *   - Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using all threads
+ *   - Try to extend BaseCombineOperator to reduce duplicate code
  */
 @SuppressWarnings("rawtypes")
-public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBlock> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOperator.class);
-  private static final String OPERATOR_NAME = "CombineGroupByOperator";
+public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GroupByCombineOperator.class);
+  private static final String OPERATOR_NAME = "GroupByCombineOperator";
 
   // Use a higher limit for groups stored across segments. For most cases, most groups from each segment should be the
   // same, thus the total number of groups across segments should be equal or slightly higher than the number of groups
@@ -69,7 +73,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
   private final int _innerSegmentNumGroupsLimit;
   private final int _interSegmentNumGroupsLimit;
 
-  public CombineGroupByOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
+  public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long timeOutMs, int innerSegmentNumGroupsLimit) {
     _operators = operators;
     _queryContext = queryContext;
@@ -208,16 +212,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
       }
 
       // Set the execution statistics.
-      ExecutionStatistics executionStatistics = new ExecutionStatistics();
-      for (Operator operator : _operators) {
-        executionStatistics.merge(operator.getExecutionStatistics());
-      }
-      mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
-      mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
-      mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter());
-      mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
-      mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
-      mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs());
+      CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
 
       // TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we
       //       are comparing number of groups across segments with the groups limit for each segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
similarity index 88%
rename from pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index f0001af..627e9f4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.combine;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
@@ -53,15 +54,15 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * The <code>CombineGroupByOrderByOperator</code> class is the operator to combine aggregation results with group-by and order by.
+ * Combine operator for aggregation group-by queries with SQL semantic.
+ * TODO:
+ *   - Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using all threads
+ *   - Try to extend BaseCombineOperator to reduce duplicate code
  */
-// TODO: this class has a lot of duplication with {@link CombineGroupByOperator}.
-// These 2 classes can be combined into one
-// For the first iteration of Order By support, these will be separate
 @SuppressWarnings("rawtypes")
-public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResultsBlock> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOrderByOperator.class);
-  private static final String OPERATOR_NAME = "CombineGroupByOrderByOperator";
+public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);
+  private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator";
 
   private final List<Operator> _operators;
   private final QueryContext _queryContext;
@@ -72,7 +73,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
   private DataSchema _dataSchema;
   private ConcurrentIndexedTable _indexedTable;
 
-  public CombineGroupByOrderByOperator(List<Operator> operators, QueryContext queryContext,
+  public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
       ExecutorService executorService, long timeOutMs) {
     _operators = operators;
     _queryContext = queryContext;
@@ -220,16 +221,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
       }
 
       // Set the execution statistics.
-      ExecutionStatistics executionStatistics = new ExecutionStatistics();
-      for (Operator operator : _operators) {
-        executionStatistics.merge(operator.getExecutionStatistics());
-      }
-      mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
-      mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
-      mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter());
-      mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
-      mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
-      mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs());
+      CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
 
       if (_indexedTable.size() >= _indexedTableCapacity) {
         mergedBlock.setNumGroupsLimitReached(true);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
new file mode 100644
index 0000000..07aad33
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+
+/**
+ * Combine operator for selection only queries.
+ * <p>For query with LIMIT 0, directly use main thread to process one segment to get the data schema of the query.
+ * <p>Query can be early-terminated when enough documents have been collected to fulfill the LIMIT requirement.
+ * <p>NOTE: Selection order-by query with LIMIT 0 is treated as selection only query.
+ */
+@SuppressWarnings("rawtypes")
+public class SelectionOnlyCombineOperator extends BaseCombineOperator {
+  private static final String OPERATOR_NAME = "SelectionOnlyCombineOperator";
+
+  private final int _numRowsToKeep;
+
+  public SelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
+      ExecutorService executorService, long timeOutMs) {
+    super(operators, queryContext, executorService, timeOutMs);
+    _numRowsToKeep = queryContext.getLimit();
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    // For LIMIT 0 query, only process one segment to get the data schema
+    if (_numRowsToKeep == 0) {
+      IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(0).nextBlock();
+      CombineOperatorUtils.setExecutionStatistics(resultsBlock, _operators);
+      return resultsBlock;
+    }
+
+    return super.getNextBlock();
+  }
+
+  @Override
+  protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) {
+    Collection<Object[]> selectionResult = resultsBlock.getSelectionResult();
+    assert selectionResult != null;
+    return selectionResult.size() == _numRowsToKeep;
+  }
+
+  @Override
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+    DataSchema mergedDataSchema = mergedBlock.getDataSchema();
+    DataSchema dataSchemaToMerge = blockToMerge.getDataSchema();
+    assert mergedDataSchema != null && dataSchemaToMerge != null;
+    if (!mergedDataSchema.equals(dataSchemaToMerge)) {
+      String errorMessage = String
+          .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge",
+              mergedDataSchema, dataSchemaToMerge);
+      // NOTE: This is segment level log, so log at debug level to prevent flooding the log.
+      LOGGER.debug(errorMessage);
+      mergedBlock
+          .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
+      return;
+    }
+
+    Collection<Object[]> mergedRows = mergedBlock.getSelectionResult();
+    Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult();
+    assert mergedRows != null && rowsToMerge != null;
+    SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
new file mode 100644
index 0000000..af5b18e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+
+
+/**
+ * Combine operator for selection order-by queries.
+ * <p>When the first order-by expression is an identifier (column), skip processing the segments if possible based on
+ * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement.
+ * <ul>
+ *   <li>1. Sort all the segments by the column min/max value</li>
+ *   <li>2. Keep processing segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li>
+ *   <li>3. Skip processing the segments that cannot add values to the final result</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SelectionOrderByCombineOperator extends BaseCombineOperator {
+  private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator";
+
+  // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
+  // special IntermediateResultsBlock into the BlockingQueue to awake the main thread
+  private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
+      new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
+          Collections.emptyList());
+
+  private final int _numRowsToKeep;
+
+  public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
+      ExecutorService executorService, long timeOutMs) {
+    super(operators, queryContext, executorService, timeOutMs);
+    _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
+    assert orderByExpressions != null;
+    if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) {
+      return minMaxValueBasedCombine();
+    } else {
+      return super.getNextBlock();
+    }
+  }
+
+  private IntermediateResultsBlock minMaxValueBasedCombine() {
+    long startTimeMs = System.currentTimeMillis();
+    long endTimeMs = startTimeMs + _timeOutMs;
+
+    List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
+    assert orderByExpressions != null;
+    int numOrderByExpressions = orderByExpressions.size();
+    assert numOrderByExpressions > 0;
+    OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
+    assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER;
+    String firstOrderByColumn = firstOrderByExpression.getExpression().getIdentifier();
+    boolean asc = firstOrderByExpression.isAsc();
+
+    int numOperators = _operators.size();
+    List<MinMaxValueContext> minMaxValueContexts = new ArrayList<>(numOperators);
+    for (Operator operator : _operators) {
+      minMaxValueContexts.add(new MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn));
+    }
+    try {
+      if (asc) {
+        // For ascending order, sort on column min value in ascending order
+        minMaxValueContexts.sort((o1, o2) -> {
+          // Put segments without column min value in the front because we always need to process them
+          if (o1._minValue == null) {
+            return o2._minValue == null ? 0 : -1;
+          }
+          if (o2._minValue == null) {
+            return 1;
+          }
+          return o1._minValue.compareTo(o2._minValue);
+        });
+      } else {
+        // For descending order, sort on column max value in descending order
+        minMaxValueContexts.sort((o1, o2) -> {
+          // Put segments without column max value in the front because we always need to process them
+          if (o1._maxValue == null) {
+            return o2._maxValue == null ? 0 : -1;
+          }
+          if (o2._maxValue == null) {
+            return 1;
+          }
+          return o2._maxValue.compareTo(o1._maxValue);
+        });
+      }
+    } catch (Exception e) {
+      // Fall back to the default combine (process all segments) when segments have different data types for the first
+      // order-by column
+      LOGGER.warn("Segments have different data types for the first order-by column: {}, using the default combine",
+          firstOrderByColumn);
+      return super.getNextBlock();
+    }
+
+    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
+    AtomicReference<Comparable> globalBoundaryValue = new AtomicReference<>();
+
+    // Use a BlockingQueue to store the per-segment result
+    BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
+    // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
+    AtomicInteger numOperatorsSkipped = new AtomicInteger();
+    // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+    // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+    // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+    // behavior (even JVM crash) when processing queries against it.
+    Phaser phaser = new Phaser(1);
+
+    Future[] futures = new Future[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      int threadIndex = i;
+      futures[i] = _executorService.submit(new TraceRunnable() {
+        @Override
+        public void runJob() {
+          try {
+            // Register the thread to the phaser
+            // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+            //       means the query execution has finished, and the main thread has deregistered itself and returned
+            //       the result. Directly return as no execution result will be taken.
+            if (phaser.register() < 0) {
+              return;
+            }
+
+            // Keep a boundary value for the thread
+            // NOTE: The thread boundary value can be different from the global boundary value because thread boundary
+            //       value is updated after processing the segment, while global boundary value is updated after the
+            //       segment result is merged.
+            Comparable threadBoundaryValue = null;
+
+            for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
+              // Calculate the boundary value from global boundary and thread boundary
+              Comparable boundaryValue = globalBoundaryValue.get();
+              if (boundaryValue == null) {
+                boundaryValue = threadBoundaryValue;
+              } else {
+                if (threadBoundaryValue != null) {
+                  if (asc) {
+                    if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
+                      boundaryValue = threadBoundaryValue;
+                    }
+                  } else {
+                    if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
+                      boundaryValue = threadBoundaryValue;
+                    }
+                  }
+                }
+              }
+
+              // Check if the segment can be skipped
+              MinMaxValueContext minMaxValueContext = minMaxValueContexts.get(operatorIndex);
+              if (boundaryValue != null) {
+                if (asc) {
+                  // For ascending order, no need to process more segments if the column min value is larger than the
+                  // boundary value, or is equal to the boundary value and the there is only one order-by expression
+                  if (minMaxValueContext._minValue != null) {
+                    int result = minMaxValueContext._minValue.compareTo(boundaryValue);
+                    if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
+                      numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
+                      blockingQueue.offer(LAST_RESULTS_BLOCK);
+                      return;
+                    }
+                  }
+                } else {
+                  // For descending order, no need to process more segments if the column max value is smaller than the
+                  // boundary value, or is equal to the boundary value and the there is only one order-by expression
+                  if (minMaxValueContext._maxValue != null) {
+                    int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
+                    if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
+                      numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
+                      blockingQueue.offer(LAST_RESULTS_BLOCK);
+                      return;
+                    }
+                  }
+                }
+              }
+
+              // Process the segment
+              try {
+                IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
+                PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
+                if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+                  // Segment result has enough rows, update the boundary value
+                  assert selectionResult.peek() != null;
+                  Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
+                  if (boundaryValue == null) {
+                    boundaryValue = segmentBoundaryValue;
+                  } else {
+                    if (asc) {
+                      if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+                        boundaryValue = segmentBoundaryValue;
+                      }
+                    } else {
+                      if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+                        boundaryValue = segmentBoundaryValue;
+                      }
+                    }
+                  }
+                }
+                threadBoundaryValue = boundaryValue;
+                blockingQueue.offer(resultsBlock);
+              } catch (EarlyTerminationException e) {
+                // Early-terminated by interruption (canceled by the main thread)
+                return;
+              } catch (Exception e) {
+                // Caught exception, skip processing the remaining operators
+                LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
+                    _queryContext, e);
+                blockingQueue.offer(new IntermediateResultsBlock(e));
+                return;
+              }
+            }
+          } finally {
+            phaser.arriveAndDeregister();
+          }
+        }
+      });
+    }
+
+    IntermediateResultsBlock mergedBlock = null;
+    try {
+      int numBlocksMerged = 0;
+      while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) {
+        IntermediateResultsBlock blockToMerge =
+            blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        if (blockToMerge == null) {
+          // Query times out, skip merging the remaining results blocks
+          LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
+              _queryContext);
+          mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+              new TimeoutException("Timed out while polling results block")));
+          break;
+        }
+        if (blockToMerge.getProcessingExceptions() != null) {
+          // Caught exception while processing segment, skip merging the remaining results blocks and directly return
+          // the exception
+          mergedBlock = blockToMerge;
+          break;
+        }
+        if (mergedBlock == null) {
+          mergedBlock = blockToMerge;
+        } else {
+          if (blockToMerge != LAST_RESULTS_BLOCK) {
+            mergeResultsBlocks(mergedBlock, blockToMerge);
+          }
+        }
+        numBlocksMerged++;
+
+        // Update the boundary value if enough rows are collected
+        PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+        if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+          assert selectionResult.peek() != null;
+          globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
+      mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      // Deregister the main thread and wait for all threads done
+      phaser.awaitAdvance(phaser.arriveAndDeregister());
+    }
+
+    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+    return mergedBlock;
+  }
+
+  private static class MinMaxValueContext {
+    final SelectionOrderByOperator _operator;
+    final Comparable _minValue;
+    final Comparable _maxValue;
+
+    MinMaxValueContext(SelectionOrderByOperator operator, String column) {
+      _operator = operator;
+      DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
+      _minValue = dataSourceMetadata.getMinValue();
+      _maxValue = dataSourceMetadata.getMaxValue();
+    }
+  }
+
+  @Override
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+    DataSchema mergedDataSchema = mergedBlock.getDataSchema();
+    DataSchema dataSchemaToMerge = blockToMerge.getDataSchema();
+    assert mergedDataSchema != null && dataSchemaToMerge != null;
+    if (!mergedDataSchema.equals(dataSchemaToMerge)) {
+      String errorMessage = String
+          .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge",
+              mergedDataSchema, dataSchemaToMerge);
+      // NOTE: This is segment level log, so log at debug level to prevent flooding the log.
+      LOGGER.debug(errorMessage);
+      mergedBlock
+          .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
+      return;
+    }
+
+    PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+    Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult();
+    assert mergedRows != null && rowsToMerge != null;
+    SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index e13ef48..637672c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -166,6 +166,10 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
     };
   }
 
+  public IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
   @Override
   protected IntermediateResultsBlock getNextBlock() {
     if (_expressions.size() == _orderByExpressions.size()) {
@@ -183,6 +187,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
 
     // Fetch all the expressions and insert them into the priority queue
     BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+    int numColumnsProjected = _transformOperator.getNumColumnsProjected();
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
       for (int i = 0; i < numExpressions; i++) {
@@ -191,12 +196,12 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
       }
       RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
       int numDocsFetched = transformBlock.getNumDocs();
-      _numDocsScanned += numDocsFetched;
       for (int i = 0; i < numDocsFetched; i++) {
         SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep);
       }
+      _numDocsScanned += numDocsFetched;
+      _numEntriesScannedPostFilter += numDocsFetched * numColumnsProjected;
     }
-    _numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected();
 
     // Create the data schema
     String[] columnNames = new String[numExpressions];
@@ -221,6 +226,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
 
     // Fetch the order-by expressions and docIds and insert them into the priority queue
     BlockValSet[] blockValSets = new BlockValSet[numOrderByExpressions + 1];
+    int numColumnsProjected = _transformOperator.getNumColumnsProjected();
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
       for (int i = 0; i < numOrderByExpressions; i++) {
@@ -230,7 +236,6 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
       blockValSets[numOrderByExpressions] = transformBlock.getBlockValueSet(BuiltInVirtualColumn.DOCID);
       RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
       int numDocsFetched = transformBlock.getNumDocs();
-      _numDocsScanned += numDocsFetched;
       for (int i = 0; i < numDocsFetched; i++) {
         // NOTE: We pre-allocate the complete row so that we can fill up the non-order-by output expression values later
         //       without creating extra rows or re-constructing the priority queue. We can change the values in-place
@@ -239,6 +244,8 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
         blockValueFetcher.getRow(i, row, 0);
         SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
       }
+      _numDocsScanned += numDocsFetched;
+      _numEntriesScannedPostFilter += numDocsFetched * numColumnsProjected;
     }
 
     // Copy the rows (shallow copy so that any modification will also be reflected to the priority queue) into a list,
@@ -261,6 +268,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
     for (ExpressionContext expressionContext : nonOrderByExpressions) {
       expressionContext.getColumns(columns);
     }
+    int numColumns = columns.size();
     Map<String, DataSource> dataSourceMap = new HashMap<>();
     for (String column : columns) {
       dataSourceMap.put(column, _indexSegment.getDataSource(column));
@@ -283,11 +291,9 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
       for (int i = 0; i < numDocsFetched; i++) {
         blockValueFetcher.getRow(i, rowList.get(rowBaseId + i), numOrderByExpressions);
       }
+      _numEntriesScannedPostFilter += numDocsFetched * numColumns;
       rowBaseId += numDocsFetched;
     }
-    _numEntriesScannedPostFilter =
-        (long) _numDocsScanned * _transformOperator.getNumColumnsProjected() + (long) numRows * transformOperator
-            .getNumColumnsProjected();
 
     // Create the data schema
     String[] columnNames = new String[numExpressions];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 71a40b3..4f01fea 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -26,10 +26,12 @@ import java.util.concurrent.Future;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.CombineGroupByOperator;
-import org.apache.pinot.core.operator.CombineGroupByOrderByOperator;
-import org.apache.pinot.core.operator.CombineOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.combine.AggregationOnlyCombineOperator;
+import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator;
+import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator;
 import org.apache.pinot.core.query.exception.BadQueryRequestException;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
@@ -158,18 +160,26 @@ public class CombinePlanNode implements PlanNode {
       }
     }
 
-    // TODO: use the same combine operator for both aggregation and selection query.
-    if (QueryContextUtils.isAggregationQuery(_queryContext) && _queryContext.getGroupByExpressions() != null) {
-      // Aggregation group-by query
-      QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions());
-      // new Combine operator only when GROUP_BY_MODE explicitly set to SQL
-      if (queryOptions.isGroupByModeSQL()) {
-        return new CombineGroupByOrderByOperator(operators, _queryContext, _executorService, _timeOutMs);
+    if (QueryContextUtils.isAggregationQuery(_queryContext)) {
+      if (_queryContext.getGroupByExpressions() == null) {
+        // Aggregation only
+        return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+      } else {
+        // Aggregation group-by
+        QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions());
+        if (queryOptions.isGroupByModeSQL()) {
+          return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+        }
+        return new GroupByCombineOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit);
       }
-      return new CombineGroupByOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit);
     } else {
-      // Selection or aggregation only query
-      return new CombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+      if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) {
+        // Selection only
+        return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+      } else {
+        // Selection order-by
+        return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+      }
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java
deleted file mode 100644
index 80cf40e..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.reduce;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.PriorityQueue;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>CombineService</code> class provides the utility methods to combine {@link IntermediateResultsBlock}s.
- */
-@SuppressWarnings({"ConstantConditions", "rawtypes", "unchecked"})
-public class CombineService {
-  private CombineService() {
-  }
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(CombineService.class);
-
-  public static void mergeTwoBlocks(QueryContext queryContext, IntermediateResultsBlock mergedBlock,
-      IntermediateResultsBlock blockToMerge) {
-    // Combine processing exceptions.
-    List<ProcessingException> mergedProcessingExceptions = mergedBlock.getProcessingExceptions();
-    List<ProcessingException> processingExceptionsToMerge = blockToMerge.getProcessingExceptions();
-    if (mergedProcessingExceptions == null) {
-      mergedBlock.setProcessingExceptions(processingExceptionsToMerge);
-    } else if (processingExceptionsToMerge != null) {
-      mergedProcessingExceptions.addAll(processingExceptionsToMerge);
-    }
-
-    // Combine result.
-    if (QueryContextUtils.isAggregationQuery(queryContext)) {
-      // NOTE: Aggregation group-by queries should not reach here.
-      assert queryContext.getGroupByExpressions() == null;
-
-      // Combine aggregation-only result.
-      // Might be null if caught exception during query execution.
-      List<Object> aggregationResultToMerge = blockToMerge.getAggregationResult();
-      if (aggregationResultToMerge == null) {
-        // No data in block to merge.
-        return;
-      }
-
-      AggregationFunction[] mergedAggregationFunctions = mergedBlock.getAggregationFunctions();
-      if (mergedAggregationFunctions == null) {
-        // No data in merged block.
-        mergedBlock.setAggregationFunctions(blockToMerge.getAggregationFunctions());
-        mergedBlock.setAggregationResults(aggregationResultToMerge);
-      } else {
-        // Merge two blocks.
-        List<Object> mergedAggregationResult = mergedBlock.getAggregationResult();
-        int numAggregationFunctions = mergedAggregationFunctions.length;
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          mergedAggregationResult.set(i,
-              mergedAggregationFunctions[i].merge(mergedAggregationResult.get(i), aggregationResultToMerge.get(i)));
-        }
-      }
-    } else {
-      // Combine selection result.
-
-      // Data schema will be null if exceptions caught during query processing.
-      // Result set size will be zero if no row matches the predicate.
-      DataSchema mergedBlockSchema = mergedBlock.getDataSchema();
-      DataSchema blockToMergeSchema = blockToMerge.getDataSchema();
-      Collection<Object[]> mergedBlockResultSet = mergedBlock.getSelectionResult();
-      Collection<Object[]> blockToMergeResultSet = blockToMerge.getSelectionResult();
-
-      if (mergedBlockSchema == null || mergedBlockResultSet.size() == 0) {
-        // No data in merged block.
-
-        // If block to merge schema is not null, set its data schema and result to the merged block.
-        if (blockToMergeSchema != null) {
-          mergedBlock.setDataSchema(blockToMergeSchema);
-          mergedBlock.setSelectionResult(blockToMergeResultSet);
-        }
-      } else {
-        // Some data in merged block.
-
-        boolean isSelectionOrderBy = queryContext.getOrderByExpressions() != null;
-        int limit = queryContext.getLimit();
-
-        // No need to merge if already got enough rows for selection only.
-        if (!isSelectionOrderBy && mergedBlockResultSet.size() == limit) {
-          return;
-        }
-
-        // Merge only if there are data in block to merge.
-        if (blockToMergeSchema != null && blockToMergeResultSet.size() > 0) {
-          if (mergedBlockSchema.isTypeCompatibleWith(blockToMergeSchema)) {
-            // Two blocks are mergeable.
-
-            // Upgrade the merged block schema if necessary.
-            mergedBlockSchema.upgradeToCover(blockToMergeSchema);
-
-            // Merge two blocks.
-            if (isSelectionOrderBy) {
-              // Combine selection order-by.
-              SelectionOperatorUtils
-                  .mergeWithOrdering((PriorityQueue<Object[]>) mergedBlockResultSet, blockToMergeResultSet,
-                      queryContext.getOffset() + limit);
-            } else {
-              // Combine selection only.
-              SelectionOperatorUtils.mergeWithoutOrdering(mergedBlockResultSet, blockToMergeResultSet, limit);
-            }
-            mergedBlock.setSelectionResult(mergedBlockResultSet);
-          } else {
-            // Two blocks are not mergeable.
-
-            String errorMessage = "Data schema inconsistency between merged block schema: " + mergedBlockSchema
-                + " and block to merge schema: " + blockToMergeSchema + ", drop block to merge";
-            LOGGER.info(errorMessage);
-            mergedBlock.addToProcessingExceptions(
-                QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
-          }
-        }
-      }
-    }
-  }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
similarity index 79%
rename from pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
index 3a670fe..bfbb6c4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.combine;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.query.exception.EarlyTerminationException;
@@ -60,30 +62,40 @@ public class CombineSlowOperatorsTest {
   }
 
   @Test
-  public void testCombineOperator() {
+  public void testSelectionOnlyCombineOperator() {
     List<Operator> operators = getOperators();
-    CombineOperator combineOperator =
-        new CombineOperator(operators, QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"),
-            _executorService, TIMEOUT_MS);
+    SelectionOnlyCombineOperator combineOperator = new SelectionOnlyCombineOperator(operators,
+        QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"), _executorService, TIMEOUT_MS);
+    testCombineOperator(operators, combineOperator);
+  }
+
+  // NOTE: Skip the test for SelectionOrderByCombineOperator because it requires SelectionOrderByOperator for the early
+  //       termination optimization.
+
+  @Test
+  public void testAggregationOnlyCombineOperator() {
+    List<Operator> operators = getOperators();
+    AggregationOnlyCombineOperator combineOperator = new AggregationOnlyCombineOperator(operators,
+        QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table"), _executorService, TIMEOUT_MS);
     testCombineOperator(operators, combineOperator);
   }
 
   @Test
-  public void testCombineGroupByOperator() {
+  public void testGroupByCombineOperator() {
     List<Operator> operators = getOperators();
-    CombineGroupByOperator combineGroupByOperator = new CombineGroupByOperator(operators,
+    GroupByCombineOperator combineOperator = new GroupByCombineOperator(operators,
         QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table GROUP BY column"),
         _executorService, TIMEOUT_MS, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
-    testCombineOperator(operators, combineGroupByOperator);
+    testCombineOperator(operators, combineOperator);
   }
 
   @Test
-  public void testCombineGroupByOrderByOperator() {
+  public void testGroupByOrderByCombineOperator() {
     List<Operator> operators = getOperators();
-    CombineGroupByOrderByOperator combineGroupByOrderByOperator = new CombineGroupByOrderByOperator(operators,
+    GroupByOrderByCombineOperator combineOperator = new GroupByOrderByCombineOperator(operators,
         QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table GROUP BY column"),
         _executorService, TIMEOUT_MS);
-    testCombineOperator(operators, combineGroupByOrderByOperator);
+    testCombineOperator(operators, combineOperator);
   }
 
   /**
@@ -151,7 +163,7 @@ public class CombineSlowOperatorsTest {
 
     @Override
     public ExecutionStatistics getExecutionStatistics() {
-      return new ExecutionStatistics();
+      return new ExecutionStatistics(0, 0, 0, 0);
     }
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
new file mode 100644
index 0000000..ac4ecf5
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.plan.CombinePlanNode;
+import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Test for {@link SelectionOnlyCombineOperator} and {@link SelectionOrderByCombineOperator}.
+ */
+public class SelectionCombineOperatorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SelectionCombineEarlyTerminationTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME_PREFIX = "testSegment_";
+
+  // Create (MAX_NUM_THREADS_PER_QUERY * 2) segments so that each thread needs to process 2 segments
+  private static final int NUM_SEGMENTS = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 2;
+  private static final int NUM_RECORDS_PER_SEGMENT = 100;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build();
+
+  private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
+  private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
+
+  private List<IndexSegment> _indexSegments;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+    _indexSegments = new ArrayList<>(NUM_SEGMENTS);
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
+      _indexSegments.add(createSegment(i));
+    }
+  }
+
+  private IndexSegment createSegment(int index)
+      throws Exception {
+    int baseValue = index * NUM_RECORDS_PER_SEGMENT / 2;
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS_PER_SEGMENT);
+    for (int i = 0; i < NUM_RECORDS_PER_SEGMENT; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, baseValue + i);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    String segmentName = SEGMENT_NAME_PREFIX + index;
+    segmentGeneratorConfig.setSegmentName(segmentName);
+    segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    return ImmutableSegmentLoader.load(new File(TEMP_DIR, segmentName), ReadMode.mmap);
+  }
+
+  @Test
+  public void testSelectionLimit0() {
+    IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable LIMIT 0");
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    assertNotNull(combineResult.getSelectionResult());
+    assertTrue(combineResult.getSelectionResult().isEmpty());
+    assertEquals(combineResult.getNumDocsScanned(), 0);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), 0);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    assertEquals(combineResult.getNumSegmentsMatched(), 0);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+  }
+
+  @Test
+  public void testSelectionOnly() {
+    IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable");
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    assertNotNull(combineResult.getSelectionResult());
+    assertEquals(combineResult.getSelectionResult().size(), 10);
+    // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
+    // segment.
+    long numDocsScanned = combineResult.getNumDocsScanned();
+    assertTrue(numDocsScanned >= 10 && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    int numSegmentsMatched = combineResult.getNumSegmentsMatched();
+    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+
+    combineResult = getCombineResult("SELECT * FROM testTable LIMIT 10000");
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    assertNotNull(combineResult.getSelectionResult());
+    assertEquals(combineResult.getSelectionResult().size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+    // Should not early-terminate
+    numDocsScanned = combineResult.getNumDocsScanned();
+    assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+  }
+
+  @Test
+  public void testSelectionOrderBy() {
+    IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn");
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult();
+    assertNotNull(selectionResult);
+    assertEquals(selectionResult.size(), 10);
+    int expectedValue = 9;
+    while (!selectionResult.isEmpty()) {
+      assertEquals((int) selectionResult.poll()[0], expectedValue--);
+    }
+    // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
+    // segment.
+    long numDocsScanned = combineResult.getNumDocsScanned();
+    assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
+        && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    int numSegmentsMatched = combineResult.getNumSegmentsMatched();
+    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+
+    combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC");
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult();
+    assertNotNull(selectionResult);
+    assertEquals(selectionResult.size(), 10);
+    expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40;
+    while (!selectionResult.isEmpty()) {
+      assertEquals((int) selectionResult.poll()[0], expectedValue++);
+    }
+    // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
+    // segment.
+    numDocsScanned = combineResult.getNumDocsScanned();
+    assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
+        && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    numSegmentsMatched = combineResult.getNumSegmentsMatched();
+    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+
+    combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult();
+    assertNotNull(selectionResult);
+    assertEquals(selectionResult.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+    // Should not early-terminate
+    numDocsScanned = combineResult.getNumDocsScanned();
+    assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+  }
+
+  private IntermediateResultsBlock getCombineResult(String query) {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromPQL(query);
+    List<PlanNode> planNodes = new ArrayList<>(NUM_SEGMENTS);
+    for (IndexSegment indexSegment : _indexSegments) {
+      planNodes.add(PLAN_MAKER.makeSegmentPlanNode(indexSegment, queryContext));
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, queryContext, EXECUTOR, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    return combinePlanNode.run().nextBlock();
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    for (IndexSegment indexSegment : _indexSegments) {
+      indexSegment.destroy();
+    }
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
index 4ec7fbc..ff32e25 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.queries;
 
 import java.io.File;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -68,7 +67,6 @@ public abstract class BaseSingleValueQueriesTest extends BaseQueriesTest {
   private static final String AVRO_DATA = "data" + File.separator + "test_data-sv.avro";
   private static final String SEGMENT_NAME = "testTable_126164076_167572854";
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "SingleValueQueriesTest");
-  private static final int NUM_SEGMENTS = 2;
 
   // Hard-coded query filter.
   private static final String QUERY_FILTER =
@@ -127,15 +125,7 @@ public abstract class BaseSingleValueQueriesTest extends BaseQueriesTest {
       throws Exception {
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     _indexSegment = immutableSegment;
-    int numSegments = getNumSegments();
-    _indexSegments = new ArrayList<>(numSegments);
-    for (int i = 0; i < numSegments; i++) {
-      _indexSegments.add(immutableSegment);
-    }
-  }
-
-  protected int getNumSegments() {
-    return NUM_SEGMENTS;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
deleted file mode 100644
index bca78a0..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.queries;
-
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.core.operator.CombineOperator;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * Early termination test for selection-only queries.
- */
-public class SelectionOnlyEarlyTerminationTest extends BaseSingleValueQueriesTest {
-  private static final int NUM_DOCS_PER_SEGMENT = 30000;
-  private static final int NUM_SERVERS = 2;
-
-  /**
-   * In order to ensure each thread is executing more than 1 segment, this test is against
-   * (2 * MAX_NUM_THREADS_PER_QUERY) segments per server.
-   */
-  @Override
-  protected int getNumSegments() {
-    return CombineOperator.MAX_NUM_THREADS_PER_QUERY * 2;
-  }
-
-  /**
-   * With early termination, selection-only query is scheduled with {@link CombineOperator#MAX_NUM_THREADS_PER_QUERY}
-   * threads per server, and the total number of segments matched (segments with non-zero documents scanned) should be
-   * the same as the total number of threads for each server.
-   */
-  @Test
-  public void testSelectOnlyQuery() {
-    int numThreadsPerServer = CombineOperator.MAX_NUM_THREADS_PER_QUERY;
-    int numSegmentsPerServer = getNumSegments();
-
-    // LIMIT = 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480
-    for (int limit = 5; limit < NUM_DOCS_PER_SEGMENT; limit *= 2) {
-      String query = String.format("SELECT column1, column7, column9, column6 FROM testTable LIMIT %d", limit);
-      int numColumnsInSelection = 4;
-      BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
-      assertNotNull(brokerResponse.getSelectionResults());
-      assertNull(brokerResponse.getResultTable());
-      assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
-      // NOTE: 'numSegmentsMatched' and 'numDocsScanned' could be in a range because when the CombineOperator second
-      //       phase merge early terminates, the operators might not finish scanning the documents
-      long numSegmentsMatched = brokerResponse.getNumSegmentsMatched();
-      assertTrue(numSegmentsMatched >= NUM_SERVERS && numSegmentsMatched <= numThreadsPerServer * NUM_SERVERS);
-      long numDocsScanned = brokerResponse.getNumDocsScanned();
-      assertTrue(numDocsScanned >= NUM_SERVERS * limit && numDocsScanned <= numThreadsPerServer * NUM_SERVERS * limit);
-      assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
-      assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), numDocsScanned * numColumnsInSelection);
-      // Total number of documents should not be affected by early-termination
-      assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-
-      brokerResponse = getBrokerResponseForSqlQuery(query);
-      assertNull(brokerResponse.getSelectionResults());
-      assertNotNull(brokerResponse.getResultTable());
-      assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
-      // NOTE: 'numSegmentsMatched' and 'numDocsScanned' could be in a range because when the CombineOperator second
-      //       phase merge early terminates, the operators might not finish scanning the documents
-      numSegmentsMatched = brokerResponse.getNumSegmentsMatched();
-      assertTrue(numSegmentsMatched >= NUM_SERVERS && numSegmentsMatched <= numThreadsPerServer * NUM_SERVERS);
-      numDocsScanned = brokerResponse.getNumDocsScanned();
-      assertTrue(numDocsScanned >= NUM_SERVERS * limit && numDocsScanned <= numThreadsPerServer * NUM_SERVERS * limit);
-      assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
-      assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), numDocsScanned * numColumnsInSelection);
-      // Total number of documents should not be affected by early-termination
-      assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-    }
-  }
-
-  /**
-   * Without early termination, selection order-by query should hit all segments.
-   */
-  @Test
-  public void testSelectWithOrderByQuery() {
-    int numSegmentsPerServer = getNumSegments();
-    String query = "SELECT column11, column18, column1 FROM testTable ORDER BY column11";
-    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
-    assertNotNull(brokerResponse.getSelectionResults());
-    assertNull(brokerResponse.getResultTable());
-    assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
-    assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer * NUM_SERVERS);
-    assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
-    // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 non-order-by columns) per segment
-    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
-        brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * NUM_SERVERS);
-    assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-
-    brokerResponse = getBrokerResponseForSqlQuery(query);
-    assertNull(brokerResponse.getSelectionResults());
-    assertNotNull(brokerResponse.getResultTable());
-    assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
-    assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer * NUM_SERVERS);
-    assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-    assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
-    // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 non-order-by columns) per segment
-    assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
-        brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * NUM_SERVERS);
-    assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org