You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/09/21 23:15:07 UTC
[pinot] branch master updated: Unify CombineOperator
multi-threading logic (#7450)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4972110 Unify CombineOperator multi-threading logic (#7450)
4972110 is described below
commit 49721105554f77c1e0ab7d483875161178d11cde
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 21 16:14:51 2021 -0700
Unify CombineOperator multi-threading logic (#7450)
Unify the logic of using multiple threads to process segments within the combine operators
- Make group-by combine operators follow the numThreads limit so that we can control it's thread usage
- Extract common logic in combine operators and simplify the actual implementation
NOTE: This PR doesn't limit the thread usage for group-by combine operator to keep the existing behavior, but only make it possible
---
.../core/operator/combine/BaseCombineOperator.java | 63 +++++++++-------
.../operator/combine/GroupByCombineOperator.java | 31 ++++----
.../combine/GroupByOrderByCombineOperator.java | 83 ++++++++++------------
...xValueBasedSelectionOrderByCombineOperator.java | 58 ++++++---------
.../StreamingSelectionOnlyCombineOperator.java | 32 +++------
5 files changed, 122 insertions(+), 145 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index eeb27c2..4228243 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -55,21 +55,21 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
protected final QueryContext _queryContext;
protected final ExecutorService _executorService;
protected final long _endTimeMs;
- protected final int _numTasks;
+ protected final int _numThreads;
protected final Future[] _futures;
// Use a _blockingQueue to store the intermediate results blocks
protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0);
protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
- long endTimeMs, int numTasks) {
+ long endTimeMs, int numThreads) {
_operators = operators;
_numOperators = _operators.size();
_queryContext = queryContext;
_executorService = executorService;
_endTimeMs = endTimeMs;
- _numTasks = numTasks;
- _futures = new Future[_numTasks];
+ _numThreads = numThreads;
+ _futures = new Future[_numThreads];
}
protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
@@ -86,8 +86,8 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
// behavior (even JVM crash) when processing queries against it.
Phaser phaser = new Phaser(1);
- for (int i = 0; i < _numTasks; i++) {
- int taskIndex = i;
+ for (int i = 0; i < _numThreads; i++) {
+ int threadIndex = i;
_futures[i] = _executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
@@ -102,8 +102,15 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
return;
}
try {
- processSegments(taskIndex);
+ processSegments(threadIndex);
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main thread)
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining segments
+ LOGGER.error("Caught exception while processing query: {}", _queryContext, e);
+ onException(e);
} finally {
+ onFinish();
phaser.arriveAndDeregister();
}
@@ -135,7 +142,7 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
* to the pool size.
* TODO: Get the actual number of query worker threads instead of using the default value.
*/
- int numServerThreads = Math.min(_numTasks, ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+ int numServerThreads = Math.min(_numThreads, ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
CombineOperatorUtils
.setExecutionStatistics(mergedBlock, _operators, _totalWorkerThreadCpuTimeNs.get(), numServerThreads);
return mergedBlock;
@@ -144,31 +151,33 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
/**
* Executes query on one or more segments in a worker thread.
*/
- protected void processSegments(int taskIndex) {
- for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
- try {
- IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
- if (isQuerySatisfied(resultsBlock)) {
- // Query is satisfied, skip processing the remaining segments
- _blockingQueue.offer(resultsBlock);
- return;
- } else {
- _blockingQueue.offer(resultsBlock);
- }
- } catch (EarlyTerminationException e) {
- // Early-terminated by interruption (canceled by the main thread)
- return;
- } catch (Exception e) {
- // Caught exception, skip processing the remaining operators
- LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
- e);
- _blockingQueue.offer(new IntermediateResultsBlock(e));
+ protected void processSegments(int threadIndex) {
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+ if (isQuerySatisfied(resultsBlock)) {
+ // Query is satisfied, skip processing the remaining segments
+ _blockingQueue.offer(resultsBlock);
return;
+ } else {
+ _blockingQueue.offer(resultsBlock);
}
}
}
/**
+ * Invoked when {@link #processSegments(int)} throws exception.
+ */
+ protected void onException(Exception e) {
+ _blockingQueue.offer(new IntermediateResultsBlock(e));
+ }
+
+ /**
+ * Invoked when {@link #processSegments(int)} is finished (called in the finally block).
+ */
+ protected void onFinish() {
+ }
+
+ /**
* Merges the results from the worker threads into a results block.
*/
protected IntermediateResultsBlock mergeResults()
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 1ac4725..4cc057e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -38,7 +38,6 @@ import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +81,7 @@ public class GroupByCombineOperator extends BaseCombineOperator {
_aggregationFunctions = _queryContext.getAggregationFunctions();
assert _aggregationFunctions != null;
_numAggregationFunctions = _aggregationFunctions.length;
- _operatorLatch = new CountDownLatch(_numOperators);
+ _operatorLatch = new CountDownLatch(_numThreads);
}
@Override
@@ -95,18 +94,17 @@ public class GroupByCombineOperator extends BaseCombineOperator {
*/
@Override
protected void processSegments(int threadIndex) {
- try {
- IntermediateResultsBlock intermediateResultsBlock =
- (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
// Merge processing exceptions.
- List<ProcessingException> processingExceptionsToMerge = intermediateResultsBlock.getProcessingExceptions();
+ List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
if (processingExceptionsToMerge != null) {
_mergedProcessingExceptions.addAll(processingExceptionsToMerge);
}
// Merge aggregation group-by result.
- AggregationGroupByResult aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult();
+ AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
if (aggregationGroupByResult != null) {
// Iterate over the group-by keys, for each key, update the group-by result in the _resultsMap.
Iterator<GroupKeyGenerator.StringGroupKey> groupKeyIterator =
@@ -131,18 +129,19 @@ public class GroupByCombineOperator extends BaseCombineOperator {
});
}
}
- } catch (EarlyTerminationException e) {
- // Early-terminated because query times out or is already satisfied
- } catch (Exception e) {
- LOGGER.error(
- "Caught exception while processing and combining group-by for index: {}, operator: {}, queryContext: {}",
- threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
- _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
- } finally {
- _operatorLatch.countDown();
}
}
+ @Override
+ protected void onException(Exception e) {
+ _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+ }
+
+ @Override
+ protected void onFinish() {
+ _operatorLatch.countDown();
+ }
+
/**
* {@inheritDoc}
*
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b2d9373..cf9e9e4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -29,8 +29,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
@@ -48,7 +46,6 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.core.util.QueryOptions;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +62,6 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator";
private final int _trimSize;
private final int _trimThreshold;
- private final Lock _initLock;
private final int _numAggregationFunctions;
private final int _numGroupByExpressions;
private final int _numColumns;
@@ -73,14 +69,13 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
// We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished
// _futures (try to interrupt the execution if it already started).
private final CountDownLatch _operatorLatch;
- private DataSchema _dataSchema;
- private IndexedTable _indexedTable;
+
+ private volatile IndexedTable _indexedTable;
public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long endTimeMs, int minTrimSize, int trimThreshold) {
// GroupByOrderByCombineOperator use numOperators as numThreads
super(operators, queryContext, executorService, endTimeMs, operators.size());
- _initLock = new ReentrantLock();
Map<String, String> queryOptions = queryContext.getQueryOptions();
if (queryOptions != null) {
@@ -111,8 +106,7 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
assert _queryContext.getGroupByExpressions() != null;
_numGroupByExpressions = _queryContext.getGroupByExpressions().size();
_numColumns = _numGroupByExpressions + _numAggregationFunctions;
- int numOperators = _operators.size();
- _operatorLatch = new CountDownLatch(numOperators);
+ _operatorLatch = new CountDownLatch(_numThreads);
}
@Override
@@ -125,43 +119,41 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
*/
@Override
protected void processSegments(int threadIndex) {
- try {
- IntermediateResultsBlock intermediateResultsBlock =
- (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
-
- _initLock.lock();
- try {
- if (_dataSchema == null) {
- _dataSchema = intermediateResultsBlock.getDataSchema();
- // NOTE: Use trimSize as resultSize on server size.
- if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- _indexedTable = new UnboundedConcurrentIndexedTable(_dataSchema, _queryContext, _trimSize);
- } else {
- _indexedTable =
- new ConcurrentIndexedTable(_dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+
+ if (_indexedTable == null) {
+ synchronized (this) {
+ if (_indexedTable == null) {
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ // NOTE: Use trimSize as resultSize on server size.
+ if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+ // special case of trim threshold where it is set to max value.
+ // there won't be any trimming during upsert in this case.
+ // thus we can avoid the overhead of read-lock and write-lock
+ // in the upsert method.
+ _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+ } else {
+ _indexedTable =
+ new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+ }
}
}
- } finally {
- _initLock.unlock();
}
// Merge processing exceptions.
- List<ProcessingException> processingExceptionsToMerge = intermediateResultsBlock.getProcessingExceptions();
+ List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
if (processingExceptionsToMerge != null) {
_mergedProcessingExceptions.addAll(processingExceptionsToMerge);
}
// Merge aggregation group-by result.
// Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
- Collection<IntermediateRecord> intermediateRecords = intermediateResultsBlock.getIntermediateRecords();
+ Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords();
// For now, only GroupBy OrderBy query has pre-constructed intermediate records
if (intermediateRecords == null) {
// Merge aggregation group-by result.
- AggregationGroupByResult aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult();
+ AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
if (aggregationGroupByResult != null) {
// Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
@@ -182,17 +174,19 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
_indexedTable.upsert(intermediateResult._key, intermediateResult._record);
}
}
- } catch (EarlyTerminationException e) {
- // Early-terminated because query times out or is already satisfied
- } catch (Exception e) {
- LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, "
- + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
- _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
- } finally {
- _operatorLatch.countDown();
}
}
+ @Override
+ protected void onException(Exception e) {
+ _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+ }
+
+ @Override
+ protected void onFinish() {
+ _operatorLatch.countDown();
+ }
+
/**
* {@inheritDoc}
*
@@ -220,16 +214,17 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
}
- _indexedTable.finish(false);
- IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(_indexedTable);
+ IndexedTable indexedTable = _indexedTable;
+ indexedTable.finish(false);
+ IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(indexedTable);
// Set the processing exceptions.
if (!_mergedProcessingExceptions.isEmpty()) {
mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions));
}
- mergedBlock.setNumResizes(_indexedTable.getNumResizes());
- mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
+ mergedBlock.setNumResizes(indexedTable.getNumResizes());
+ mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs());
// TODO - set numGroupsLimitReached
return mergedBlock;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 20ba869..f6d982f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -38,7 +38,6 @@ import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,7 +140,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
// segment result is merged.
Comparable threadBoundaryValue = null;
- for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
// Calculate the boundary value from global boundary and thread boundary
Comparable boundaryValue = _globalBoundaryValue.get();
if (boundaryValue == null) {
@@ -169,7 +168,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
if (minMaxValueContext._minValue != null) {
int result = minMaxValueContext._minValue.compareTo(boundaryValue);
if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
@@ -180,7 +179,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
if (minMaxValueContext._maxValue != null) {
int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
- _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
@@ -189,39 +188,28 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
}
// Process the segment
- try {
- IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
- if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
- // Segment result has enough rows, update the boundary value
- assert selectionResult.peek() != null;
- Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
- if (boundaryValue == null) {
- boundaryValue = segmentBoundaryValue;
+ IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
+ if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+ // Segment result has enough rows, update the boundary value
+ assert selectionResult.peek() != null;
+ Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
+ if (boundaryValue == null) {
+ boundaryValue = segmentBoundaryValue;
+ } else {
+ if (asc) {
+ if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+ boundaryValue = segmentBoundaryValue;
+ }
} else {
- if (asc) {
- if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
- boundaryValue = segmentBoundaryValue;
- }
- } else {
- if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
- boundaryValue = segmentBoundaryValue;
- }
+ if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+ boundaryValue = segmentBoundaryValue;
}
}
}
- threadBoundaryValue = boundaryValue;
- _blockingQueue.offer(resultsBlock);
- } catch (EarlyTerminationException e) {
- // Early-terminated by interruption (canceled by the main thread)
- return;
- } catch (Exception e) {
- // Caught exception, skip processing the remaining operators
- LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
- e);
- _blockingQueue.offer(new IntermediateResultsBlock(e));
- return;
}
+ threadBoundaryValue = boundaryValue;
+ _blockingQueue.offer(resultsBlock);
}
}
@@ -250,15 +238,13 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
// Query times out, skip merging the remaining results blocks
LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
_queryContext);
- mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
new TimeoutException("Timed out while polling results block")));
- break;
}
if (blockToMerge.getProcessingExceptions() != null) {
// Caught exception while processing segment, skip merging the remaining results blocks and directly return
// the exception
- mergedBlock = blockToMerge;
- break;
+ return blockToMerge;
}
if (mergedBlock == null) {
mergedBlock = blockToMerge;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
index a003e3e..17eb1cd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -35,7 +35,6 @@ import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.combine.BaseCombineOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,30 +70,19 @@ public class StreamingSelectionOnlyCombineOperator extends BaseCombineOperator {
@Override
protected void processSegments(int threadIndex) {
- for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
Operator<IntermediateResultsBlock> operator = _operators.get(operatorIndex);
- try {
- IntermediateResultsBlock resultsBlock;
- while ((resultsBlock = operator.nextBlock()) != null) {
- Collection<Object[]> rows = resultsBlock.getSelectionResult();
- assert rows != null;
- long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
- _blockingQueue.offer(resultsBlock);
- if (numRowsCollected >= _limit) {
- return;
- }
+ IntermediateResultsBlock resultsBlock;
+ while ((resultsBlock = operator.nextBlock()) != null) {
+ Collection<Object[]> rows = resultsBlock.getSelectionResult();
+ assert rows != null;
+ long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
+ _blockingQueue.offer(resultsBlock);
+ if (numRowsCollected >= _limit) {
+ return;
}
- _blockingQueue.offer(LAST_RESULTS_BLOCK);
- } catch (EarlyTerminationException e) {
- // Early-terminated by interruption (canceled by the main thread)
- return;
- } catch (Exception e) {
- // Caught exception, skip processing the remaining operators
- LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
- e);
- _blockingQueue.offer(new IntermediateResultsBlock(e));
- return;
}
+ _blockingQueue.offer(LAST_RESULTS_BLOCK);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org