You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/09/21 23:15:07 UTC

[pinot] branch master updated: Unify CombineOperator multi-threading logic (#7450)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4972110  Unify CombineOperator multi-threading logic (#7450)
4972110 is described below

commit 49721105554f77c1e0ab7d483875161178d11cde
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 21 16:14:51 2021 -0700

    Unify CombineOperator multi-threading logic (#7450)
    
    Unify the logic of using multiple threads to process segments within the combine operators
    - Make group-by combine operators follow the numThreads limit so that we can control it's thread usage
    - Extract common logic in combine operators and simplify the actual implementation
    
    NOTE: This PR doesn't limit the thread usage for group-by combine operator to keep the existing behavior, but only make it possible
---
 .../core/operator/combine/BaseCombineOperator.java | 63 +++++++++-------
 .../operator/combine/GroupByCombineOperator.java   | 31 ++++----
 .../combine/GroupByOrderByCombineOperator.java     | 83 ++++++++++------------
 ...xValueBasedSelectionOrderByCombineOperator.java | 58 ++++++---------
 .../StreamingSelectionOnlyCombineOperator.java     | 32 +++------
 5 files changed, 122 insertions(+), 145 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index eeb27c2..4228243 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -55,21 +55,21 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
   protected final QueryContext _queryContext;
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
-  protected final int _numTasks;
+  protected final int _numThreads;
   protected final Future[] _futures;
   // Use a _blockingQueue to store the intermediate results blocks
   protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
   protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0);
 
   protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
-      long endTimeMs, int numTasks) {
+      long endTimeMs, int numThreads) {
     _operators = operators;
     _numOperators = _operators.size();
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
-    _numTasks = numTasks;
-    _futures = new Future[_numTasks];
+    _numThreads = numThreads;
+    _futures = new Future[_numThreads];
   }
 
   protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
@@ -86,8 +86,8 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
     // behavior (even JVM crash) when processing queries against it.
     Phaser phaser = new Phaser(1);
 
-    for (int i = 0; i < _numTasks; i++) {
-      int taskIndex = i;
+    for (int i = 0; i < _numThreads; i++) {
+      int threadIndex = i;
       _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
@@ -102,8 +102,15 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
             return;
           }
           try {
-            processSegments(taskIndex);
+            processSegments(threadIndex);
+          } catch (EarlyTerminationException e) {
+            // Early-terminated by interruption (canceled by the main thread)
+          } catch (Exception e) {
+            // Caught exception, skip processing the remaining segments
+            LOGGER.error("Caught exception while processing query: {}", _queryContext, e);
+            onException(e);
           } finally {
+            onFinish();
             phaser.arriveAndDeregister();
           }
 
@@ -135,7 +142,7 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
      * to the pool size.
      * TODO: Get the actual number of query worker threads instead of using the default value.
      */
-    int numServerThreads = Math.min(_numTasks, ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+    int numServerThreads = Math.min(_numThreads, ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
     CombineOperatorUtils
         .setExecutionStatistics(mergedBlock, _operators, _totalWorkerThreadCpuTimeNs.get(), numServerThreads);
     return mergedBlock;
@@ -144,31 +151,33 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
   /**
    * Executes query on one or more segments in a worker thread.
    */
-  protected void processSegments(int taskIndex) {
-    for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
-      try {
-        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-        if (isQuerySatisfied(resultsBlock)) {
-          // Query is satisfied, skip processing the remaining segments
-          _blockingQueue.offer(resultsBlock);
-          return;
-        } else {
-          _blockingQueue.offer(resultsBlock);
-        }
-      } catch (EarlyTerminationException e) {
-        // Early-terminated by interruption (canceled by the main thread)
-        return;
-      } catch (Exception e) {
-        // Caught exception, skip processing the remaining operators
-        LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
-            e);
-        _blockingQueue.offer(new IntermediateResultsBlock(e));
+  protected void processSegments(int threadIndex) {
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+      IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+      if (isQuerySatisfied(resultsBlock)) {
+        // Query is satisfied, skip processing the remaining segments
+        _blockingQueue.offer(resultsBlock);
         return;
+      } else {
+        _blockingQueue.offer(resultsBlock);
       }
     }
   }
 
   /**
+   * Invoked when {@link #processSegments(int)} throws exception.
+   */
+  protected void onException(Exception e) {
+    _blockingQueue.offer(new IntermediateResultsBlock(e));
+  }
+
+  /**
+   * Invoked when {@link #processSegments(int)} is finished (called in the finally block).
+   */
+  protected void onFinish() {
+  }
+
+  /**
    * Merges the results from the worker threads into a results block.
    */
   protected IntermediateResultsBlock mergeResults()
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 1ac4725..4cc057e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -38,7 +38,6 @@ import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +81,7 @@ public class GroupByCombineOperator extends BaseCombineOperator {
     _aggregationFunctions = _queryContext.getAggregationFunctions();
     assert _aggregationFunctions != null;
     _numAggregationFunctions = _aggregationFunctions.length;
-    _operatorLatch = new CountDownLatch(_numOperators);
+    _operatorLatch = new CountDownLatch(_numThreads);
   }
 
   @Override
@@ -95,18 +94,17 @@ public class GroupByCombineOperator extends BaseCombineOperator {
    */
   @Override
   protected void processSegments(int threadIndex) {
-    try {
-      IntermediateResultsBlock intermediateResultsBlock =
-          (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+      IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
 
       // Merge processing exceptions.
-      List<ProcessingException> processingExceptionsToMerge = intermediateResultsBlock.getProcessingExceptions();
+      List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
       if (processingExceptionsToMerge != null) {
         _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
       }
 
       // Merge aggregation group-by result.
-      AggregationGroupByResult aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult();
+      AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
       if (aggregationGroupByResult != null) {
         // Iterate over the group-by keys, for each key, update the group-by result in the _resultsMap.
         Iterator<GroupKeyGenerator.StringGroupKey> groupKeyIterator =
@@ -131,18 +129,19 @@ public class GroupByCombineOperator extends BaseCombineOperator {
           });
         }
       }
-    } catch (EarlyTerminationException e) {
-      // Early-terminated because query times out or is already satisfied
-    } catch (Exception e) {
-      LOGGER.error(
-          "Caught exception while processing and combining group-by for index: {}, operator: {}, queryContext: {}",
-          threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
-      _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
-    } finally {
-      _operatorLatch.countDown();
     }
   }
 
+  @Override
+  protected void onException(Exception e) {
+    _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+  }
+
+  @Override
+  protected void onFinish() {
+    _operatorLatch.countDown();
+  }
+
   /**
    * {@inheritDoc}
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b2d9373..cf9e9e4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -29,8 +29,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -48,7 +46,6 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.core.util.QueryOptions;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +62,6 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
   private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator";
   private final int _trimSize;
   private final int _trimThreshold;
-  private final Lock _initLock;
   private final int _numAggregationFunctions;
   private final int _numGroupByExpressions;
   private final int _numColumns;
@@ -73,14 +69,13 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
   // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished
   // _futures (try to interrupt the execution if it already started).
   private final CountDownLatch _operatorLatch;
-  private DataSchema _dataSchema;
-  private IndexedTable _indexedTable;
+
+  private volatile IndexedTable _indexedTable;
 
   public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
       ExecutorService executorService, long endTimeMs, int minTrimSize, int trimThreshold) {
     // GroupByOrderByCombineOperator use numOperators as numThreads
     super(operators, queryContext, executorService, endTimeMs, operators.size());
-    _initLock = new ReentrantLock();
 
     Map<String, String> queryOptions = queryContext.getQueryOptions();
     if (queryOptions != null) {
@@ -111,8 +106,7 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
     assert _queryContext.getGroupByExpressions() != null;
     _numGroupByExpressions = _queryContext.getGroupByExpressions().size();
     _numColumns = _numGroupByExpressions + _numAggregationFunctions;
-    int numOperators = _operators.size();
-    _operatorLatch = new CountDownLatch(numOperators);
+    _operatorLatch = new CountDownLatch(_numThreads);
   }
 
   @Override
@@ -125,43 +119,41 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
    */
   @Override
   protected void processSegments(int threadIndex) {
-    try {
-      IntermediateResultsBlock intermediateResultsBlock =
-          (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
-
-      _initLock.lock();
-      try {
-        if (_dataSchema == null) {
-          _dataSchema = intermediateResultsBlock.getDataSchema();
-          // NOTE: Use trimSize as resultSize on server size.
-          if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
-            // special case of trim threshold where it is set to max value.
-            // there won't be any trimming during upsert in this case.
-            // thus we can avoid the overhead of read-lock and write-lock
-            // in the upsert method.
-            _indexedTable = new UnboundedConcurrentIndexedTable(_dataSchema, _queryContext, _trimSize);
-          } else {
-            _indexedTable =
-                new ConcurrentIndexedTable(_dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+      IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+
+      if (_indexedTable == null) {
+        synchronized (this) {
+          if (_indexedTable == null) {
+            DataSchema dataSchema = resultsBlock.getDataSchema();
+            // NOTE: Use trimSize as resultSize on server size.
+            if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+              // special case of trim threshold where it is set to max value.
+              // there won't be any trimming during upsert in this case.
+              // thus we can avoid the overhead of read-lock and write-lock
+              // in the upsert method.
+              _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+            } else {
+              _indexedTable =
+                  new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+            }
           }
         }
-      } finally {
-        _initLock.unlock();
       }
 
       // Merge processing exceptions.
-      List<ProcessingException> processingExceptionsToMerge = intermediateResultsBlock.getProcessingExceptions();
+      List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
       if (processingExceptionsToMerge != null) {
         _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
       }
 
       // Merge aggregation group-by result.
       // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
-      Collection<IntermediateRecord> intermediateRecords = intermediateResultsBlock.getIntermediateRecords();
+      Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords();
       // For now, only GroupBy OrderBy query has pre-constructed intermediate records
       if (intermediateRecords == null) {
         // Merge aggregation group-by result.
-        AggregationGroupByResult aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult();
+        AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
         if (aggregationGroupByResult != null) {
           // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
           Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
@@ -182,17 +174,19 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
           _indexedTable.upsert(intermediateResult._key, intermediateResult._record);
         }
       }
-    } catch (EarlyTerminationException e) {
-      // Early-terminated because query times out or is already satisfied
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, "
-          + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
-      _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
-    } finally {
-      _operatorLatch.countDown();
     }
   }
 
+  @Override
+  protected void onException(Exception e) {
+    _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+  }
+
+  @Override
+  protected void onFinish() {
+    _operatorLatch.countDown();
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -220,16 +214,17 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
       return new IntermediateResultsBlock(new TimeoutException(errorMessage));
     }
 
-    _indexedTable.finish(false);
-    IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(_indexedTable);
+    IndexedTable indexedTable = _indexedTable;
+    indexedTable.finish(false);
+    IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(indexedTable);
 
     // Set the processing exceptions.
     if (!_mergedProcessingExceptions.isEmpty()) {
       mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions));
     }
 
-    mergedBlock.setNumResizes(_indexedTable.getNumResizes());
-    mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
+    mergedBlock.setNumResizes(indexedTable.getNumResizes());
+    mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs());
     // TODO - set numGroupsLimitReached
     return mergedBlock;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 20ba869..f6d982f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -38,7 +38,6 @@ import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,7 +140,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
     //       segment result is merged.
     Comparable threadBoundaryValue = null;
 
-    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
       // Calculate the boundary value from global boundary and thread boundary
       Comparable boundaryValue = _globalBoundaryValue.get();
       if (boundaryValue == null) {
@@ -169,7 +168,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
           if (minMaxValueContext._minValue != null) {
             int result = minMaxValueContext._minValue.compareTo(boundaryValue);
             if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
-              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
               _blockingQueue.offer(LAST_RESULTS_BLOCK);
               return;
             }
@@ -180,7 +179,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
           if (minMaxValueContext._maxValue != null) {
             int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
             if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
-              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
               _blockingQueue.offer(LAST_RESULTS_BLOCK);
               return;
             }
@@ -189,39 +188,28 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
       }
 
       // Process the segment
-      try {
-        IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
-        PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
-        if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
-          // Segment result has enough rows, update the boundary value
-          assert selectionResult.peek() != null;
-          Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
-          if (boundaryValue == null) {
-            boundaryValue = segmentBoundaryValue;
+      IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
+      PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
+      if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+        // Segment result has enough rows, update the boundary value
+        assert selectionResult.peek() != null;
+        Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
+        if (boundaryValue == null) {
+          boundaryValue = segmentBoundaryValue;
+        } else {
+          if (asc) {
+            if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+              boundaryValue = segmentBoundaryValue;
+            }
           } else {
-            if (asc) {
-              if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
-                boundaryValue = segmentBoundaryValue;
-              }
-            } else {
-              if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
-                boundaryValue = segmentBoundaryValue;
-              }
+            if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+              boundaryValue = segmentBoundaryValue;
             }
           }
         }
-        threadBoundaryValue = boundaryValue;
-        _blockingQueue.offer(resultsBlock);
-      } catch (EarlyTerminationException e) {
-        // Early-terminated by interruption (canceled by the main thread)
-        return;
-      } catch (Exception e) {
-        // Caught exception, skip processing the remaining operators
-        LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
-            e);
-        _blockingQueue.offer(new IntermediateResultsBlock(e));
-        return;
       }
+      threadBoundaryValue = boundaryValue;
+      _blockingQueue.offer(resultsBlock);
     }
   }
 
@@ -250,15 +238,13 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
         // Query times out, skip merging the remaining results blocks
         LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
             _queryContext);
-        mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+        return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
             new TimeoutException("Timed out while polling results block")));
-        break;
       }
       if (blockToMerge.getProcessingExceptions() != null) {
         // Caught exception while processing segment, skip merging the remaining results blocks and directly return
         // the exception
-        mergedBlock = blockToMerge;
-        break;
+        return blockToMerge;
       }
       if (mergedBlock == null) {
         mergedBlock = blockToMerge;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
index a003e3e..17eb1cd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -35,7 +35,6 @@ import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,30 +70,19 @@ public class StreamingSelectionOnlyCombineOperator extends BaseCombineOperator {
 
   @Override
   protected void processSegments(int threadIndex) {
-    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
       Operator<IntermediateResultsBlock> operator = _operators.get(operatorIndex);
-      try {
-        IntermediateResultsBlock resultsBlock;
-        while ((resultsBlock = operator.nextBlock()) != null) {
-          Collection<Object[]> rows = resultsBlock.getSelectionResult();
-          assert rows != null;
-          long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
-          _blockingQueue.offer(resultsBlock);
-          if (numRowsCollected >= _limit) {
-            return;
-          }
+      IntermediateResultsBlock resultsBlock;
+      while ((resultsBlock = operator.nextBlock()) != null) {
+        Collection<Object[]> rows = resultsBlock.getSelectionResult();
+        assert rows != null;
+        long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
+        _blockingQueue.offer(resultsBlock);
+        if (numRowsCollected >= _limit) {
+          return;
         }
-        _blockingQueue.offer(LAST_RESULTS_BLOCK);
-      } catch (EarlyTerminationException e) {
-        // Early-terminated by interruption (canceled by the main thread)
-        return;
-      } catch (Exception e) {
-        // Caught exception, skip processing the remaining operators
-        LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
-            e);
-        _blockingQueue.offer(new IntermediateResultsBlock(e));
-        return;
       }
+      _blockingQueue.offer(LAST_RESULTS_BLOCK);
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org