You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/03/17 06:02:17 UTC
[incubator-pinot] branch master updated: Extends
SelectionOrderByCombineOperator from BaseCombineOperator (#6672)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d1cf36a Extends SelectionOrderByCombineOperator from BaseCombineOperator (#6672)
d1cf36a is described below
commit d1cf36a3cbf8653d7fdf49a3bfc422ee71d1d302
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Tue Mar 16 23:01:55 2021 -0700
Extends SelectionOrderByCombineOperator from BaseCombineOperator (#6672)
* Extends SelectionOrderByCombineOperator from BaseCombineOperator
* put the try MinMaxValueBasedCombine logic into a separate function
---
...xValueBasedSelectionOrderByCombineOperator.java | 297 +++++++++++++++++++++
.../core/operator/combine/MinMaxValueContext.java | 36 +++
.../combine/SelectionOrderByCombineOperator.java | 247 ++---------------
3 files changed, 361 insertions(+), 219 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
new file mode 100644
index 0000000..901d6bc
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+
+/**
+ * Optimized combine operator for selection order-by queries.
+ * <p>When the first order-by expression is an identifier (column), skip processing the segments if possible based on
+ * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement.
+ * <ul>
+ * <li>1. Sort all the segments by the column min/max value</li>
+ * <li>2. Keep processing segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li>
+ * <li>3. Skip processing the segments that cannot add values to the final result</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombineOperator {
+ private static final String OPERATOR_NAME = "MinMaxValueBasedSelectionOrderByCombineOperator";
+
+ // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
+ // special IntermediateResultsBlock into the BlockingQueue to awake the main thread
+ private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
+ new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
+ Collections.emptyList());
+
+ private final int _numOperators;
+ private final int numThreads;
+ // Use a BlockingQueue to store the per-segment result
+ private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+ // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
+ private final AtomicInteger _numOperatorsSkipped = new AtomicInteger();
+ private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();
+ private final int _numRowsToKeep;
+ private final List<MinMaxValueContext> _minMaxValueContexts;
+
+ public MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
+ ExecutorService executorService, long endTimeMs, List<MinMaxValueContext> minMaxValueContexts) {
+ super(operators, queryContext, executorService, endTimeMs);
+ _minMaxValueContexts = minMaxValueContexts;
+ _numOperators = _operators.size();
+ numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
+ _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+ _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> Execute query on one or more segments in a single thread, and store multiple intermediate result blocks
+ * into BlockingQueue, skip processing the segments if possible based on the column min/max value and keep enough
+ * documents to fulfill the LIMIT and OFFSET requirement.
+ */
+ protected void processSegments(int threadIndex) {
+ List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
+ assert orderByExpressions != null;
+ int numOrderByExpressions = orderByExpressions.size();
+ assert numOrderByExpressions > 0;
+ OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
+ assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER;
+ boolean asc = firstOrderByExpression.isAsc();
+
+ try {
+ // Register the thread to the _phaser
+ // NOTE: If the _phaser is terminated (returning negative value) when trying to register the thread, that
+ // means the query execution has finished, and the main thread has deregistered itself and returned
+ // the result. Directly return as no execution result will be taken.
+ if (_phaser.register() < 0) {
+ return;
+ }
+
+ // Keep a boundary value for the thread
+ // NOTE: The thread boundary value can be different from the global boundary value because thread boundary
+ // value is updated after processing the segment, while global boundary value is updated after the
+ // segment result is merged.
+ Comparable threadBoundaryValue = null;
+
+ for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += numThreads) {
+ // Calculate the boundary value from global boundary and thread boundary
+ Comparable boundaryValue = _globalBoundaryValue.get();
+ if (boundaryValue == null) {
+ boundaryValue = threadBoundaryValue;
+ } else {
+ if (threadBoundaryValue != null) {
+ if (asc) {
+ if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
+ boundaryValue = threadBoundaryValue;
+ }
+ } else {
+ if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
+ boundaryValue = threadBoundaryValue;
+ }
+ }
+ }
+ }
+
+ // Check if the segment can be skipped
+ MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorIndex);
+ if (boundaryValue != null) {
+ if (asc) {
+ // For ascending order, no need to process more segments if the column min value is larger than the
+ // boundary value, or is equal to the boundary value and the there is only one order-by expression
+ if (minMaxValueContext._minValue != null) {
+ int result = minMaxValueContext._minValue.compareTo(boundaryValue);
+ if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+ _blockingQueue.offer(LAST_RESULTS_BLOCK);
+ return;
+ }
+ }
+ } else {
+ // For descending order, no need to process more segments if the column max value is smaller than the
+ // boundary value, or is equal to the boundary value and the there is only one order-by expression
+ if (minMaxValueContext._maxValue != null) {
+ int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
+ if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
+ _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+ _blockingQueue.offer(LAST_RESULTS_BLOCK);
+ return;
+ }
+ }
+ }
+ }
+
+ // Process the segment
+ try {
+ IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
+ if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+ // Segment result has enough rows, update the boundary value
+ assert selectionResult.peek() != null;
+ Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
+ if (boundaryValue == null) {
+ boundaryValue = segmentBoundaryValue;
+ } else {
+ if (asc) {
+ if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+ boundaryValue = segmentBoundaryValue;
+ }
+ } else {
+ if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+ boundaryValue = segmentBoundaryValue;
+ }
+ }
+ }
+ }
+ threadBoundaryValue = boundaryValue;
+ _blockingQueue.offer(resultsBlock);
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main thread)
+ return;
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining operators
+ LOGGER
+ .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext,
+ e);
+ _blockingQueue.offer(new IntermediateResultsBlock(e));
+ return;
+ }
+ }
+ } finally {
+ _phaser.arriveAndDeregister();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Combines intermediate selection result blocks from underlying operators and returns a merged one.
+ * <ul>
+ * <li>
+ * Merges multiple intermediate selection result blocks as a merged one.
+ * </li>
+ * <li>
+ * Set all exceptions encountered during execution into the merged result block
+ * </li>
+ * </ul>
+ */
+ @Override
+ protected IntermediateResultsBlock mergeResultsFromSegments() {
+ IntermediateResultsBlock mergedBlock = null;
+ try {
+ int numBlocksMerged = 0;
+ while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
+ IntermediateResultsBlock blockToMerge =
+ _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (blockToMerge == null) {
+ // Query times out, skip merging the remaining results blocks
+ LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
+ _queryContext);
+ mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
+ break;
+ }
+ if (blockToMerge.getProcessingExceptions() != null) {
+ // Caught exception while processing segment, skip merging the remaining results blocks and directly return
+ // the exception
+ mergedBlock = blockToMerge;
+ break;
+ }
+ if (mergedBlock == null) {
+ mergedBlock = blockToMerge;
+ } else {
+ if (blockToMerge != LAST_RESULTS_BLOCK) {
+ mergeResultsBlocks(mergedBlock, blockToMerge);
+ }
+ }
+ numBlocksMerged++;
+
+ // Update the boundary value if enough rows are collected
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+ if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+ assert selectionResult.peek() != null;
+ _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
+ mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
+ } finally {
+ // Cancel all ongoing jobs
+ for (Future future : _futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ // Deregister the main thread and wait for all threads done
+ _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+ }
+ return mergedBlock;
+ }
+
+ @Override
+ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+ DataSchema mergedDataSchema = mergedBlock.getDataSchema();
+ DataSchema dataSchemaToMerge = blockToMerge.getDataSchema();
+ assert mergedDataSchema != null && dataSchemaToMerge != null;
+ if (!mergedDataSchema.equals(dataSchemaToMerge)) {
+ String errorMessage = String
+ .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge",
+ mergedDataSchema, dataSchemaToMerge);
+ // NOTE: This is segment level log, so log at debug level to prevent flooding the log.
+ LOGGER.debug(errorMessage);
+ mergedBlock
+ .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
+ return;
+ }
+
+ PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+ Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult();
+ assert mergedRows != null && rowsToMerge != null;
+ SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
new file mode 100644
index 0000000..3f76058
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+
+
+public class MinMaxValueContext {
+ final SelectionOrderByOperator _operator;
+ final Comparable _minValue;
+ final Comparable _maxValue;
+
+ MinMaxValueContext(SelectionOrderByOperator operator, String column) {
+ _operator = operator;
+ DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
+ _minValue = dataSourceMetadata.getMinValue();
+ _maxValue = dataSourceMetadata.getMaxValue();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index f95908f..be899fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -20,57 +20,44 @@ package org.apache.pinot.core.operator.combine;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.DataSourceMetadata;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-import org.apache.pinot.core.query.exception.EarlyTerminationException;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.core.util.trace.TraceRunnable;
/**
* Combine operator for selection order-by queries.
- * <p>When the first order-by expression is an identifier (column), skip processing the segments if possible based on
- * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement.
- * <ul>
- * <li>1. Sort all the segments by the column min/max value</li>
- * <li>2. Keep processing segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li>
- * <li>3. Skip processing the segments that cannot add values to the final result</li>
- * </ul>
+ * <p>When the first order-by expression is an identifier (column), try to use
+ * {@link org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator} first, which will
+ * skip processing some segments based on the column min/max value. Otherwise fall back to the default combine
+ * (process all segments).
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class SelectionOrderByCombineOperator extends BaseCombineOperator {
private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator";
- // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
- // special IntermediateResultsBlock into the BlockingQueue to awake the main thread
- private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
- new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
- Collections.emptyList());
-
+ private final List<Operator> _operators;
+ private final QueryContext _queryContext;
+ private final ExecutorService _executorService;
+ private final long _endTimeMs;
private final int _numRowsToKeep;
public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long endTimeMs) {
super(operators, queryContext, executorService, endTimeMs);
+ _operators = operators;
+ _queryContext = queryContext;
+ _executorService = executorService;
+ _endTimeMs = endTimeMs;
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
}
@@ -79,20 +66,29 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
return OPERATOR_NAME;
}
+ /**
+ * {@inheritDoc}
+ *
+ * <p> Execute query on one or more segments in a single thread, and store multiple intermediate result blocks
+ * into BlockingQueue. Try to use
+ * {@link org.apache.pinot.core.operator.combine.MinMaxValueBasedSelectionOrderByCombineOperator} first, which
+ * will skip processing some segments based on the column min/max value. Otherwise fall back to the default combine
+ * (process all segments).
+ */
@Override
protected IntermediateResultsBlock getNextBlock() {
List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
assert orderByExpressions != null;
if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) {
- return minMaxValueBasedCombine();
+ return tryMinMaxValueBasedCombine(orderByExpressions);
} else {
+ // Fall back to the default combine (process all segments) when segments have different data types for the first
+ // order-by column
return super.getNextBlock();
}
}
- private IntermediateResultsBlock minMaxValueBasedCombine() {
- List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
- assert orderByExpressions != null;
+ private IntermediateResultsBlock tryMinMaxValueBasedCombine(List<OrderByExpressionContext> orderByExpressions) {
int numOrderByExpressions = orderByExpressions.size();
assert numOrderByExpressions > 0;
OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
@@ -132,201 +128,14 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
});
}
} catch (Exception e) {
- // Fall back to the default combine (process all segments) when segments have different data types for the first
- // order-by column
+ // Fall back to the default combine (process all segments) if there are any exceptions.
LOGGER.warn("Segments have different data types for the first order-by column: {}, using the default combine",
firstOrderByColumn);
return super.getNextBlock();
}
- int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
- AtomicReference<Comparable> globalBoundaryValue = new AtomicReference<>();
-
- // Use a BlockingQueue to store the per-segment result
- BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
- // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
- AtomicInteger numOperatorsSkipped = new AtomicInteger();
- // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
- // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
- // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
- // behavior (even JVM crash) when processing queries against it.
- Phaser phaser = new Phaser(1);
-
- Future[] futures = new Future[numThreads];
- for (int i = 0; i < numThreads; i++) {
- int threadIndex = i;
- futures[i] = _executorService.submit(new TraceRunnable() {
- @Override
- public void runJob() {
- try {
- // Register the thread to the phaser
- // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
- // means the query execution has finished, and the main thread has deregistered itself and returned
- // the result. Directly return as no execution result will be taken.
- if (phaser.register() < 0) {
- return;
- }
-
- // Keep a boundary value for the thread
- // NOTE: The thread boundary value can be different from the global boundary value because thread boundary
- // value is updated after processing the segment, while global boundary value is updated after the
- // segment result is merged.
- Comparable threadBoundaryValue = null;
-
- for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
- // Calculate the boundary value from global boundary and thread boundary
- Comparable boundaryValue = globalBoundaryValue.get();
- if (boundaryValue == null) {
- boundaryValue = threadBoundaryValue;
- } else {
- if (threadBoundaryValue != null) {
- if (asc) {
- if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
- boundaryValue = threadBoundaryValue;
- }
- } else {
- if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
- boundaryValue = threadBoundaryValue;
- }
- }
- }
- }
-
- // Check if the segment can be skipped
- MinMaxValueContext minMaxValueContext = minMaxValueContexts.get(operatorIndex);
- if (boundaryValue != null) {
- if (asc) {
- // For ascending order, no need to process more segments if the column min value is larger than the
- // boundary value, or is equal to the boundary value and the there is only one order-by expression
- if (minMaxValueContext._minValue != null) {
- int result = minMaxValueContext._minValue.compareTo(boundaryValue);
- if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
- numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
- blockingQueue.offer(LAST_RESULTS_BLOCK);
- return;
- }
- }
- } else {
- // For descending order, no need to process more segments if the column max value is smaller than the
- // boundary value, or is equal to the boundary value and the there is only one order-by expression
- if (minMaxValueContext._maxValue != null) {
- int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
- if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
- numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
- blockingQueue.offer(LAST_RESULTS_BLOCK);
- return;
- }
- }
- }
- }
-
- // Process the segment
- try {
- IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
- if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
- // Segment result has enough rows, update the boundary value
- assert selectionResult.peek() != null;
- Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
- if (boundaryValue == null) {
- boundaryValue = segmentBoundaryValue;
- } else {
- if (asc) {
- if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
- boundaryValue = segmentBoundaryValue;
- }
- } else {
- if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
- boundaryValue = segmentBoundaryValue;
- }
- }
- }
- }
- threadBoundaryValue = boundaryValue;
- blockingQueue.offer(resultsBlock);
- } catch (EarlyTerminationException e) {
- // Early-terminated by interruption (canceled by the main thread)
- return;
- } catch (Exception e) {
- // Caught exception, skip processing the remaining operators
- LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
- _queryContext, e);
- blockingQueue.offer(new IntermediateResultsBlock(e));
- return;
- }
- }
- } finally {
- phaser.arriveAndDeregister();
- }
- }
- });
- }
-
- IntermediateResultsBlock mergedBlock = null;
- try {
- int numBlocksMerged = 0;
- while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) {
- IntermediateResultsBlock blockToMerge =
- blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- if (blockToMerge == null) {
- // Query times out, skip merging the remaining results blocks
- LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
- _queryContext);
- mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
- new TimeoutException("Timed out while polling results block")));
- break;
- }
- if (blockToMerge.getProcessingExceptions() != null) {
- // Caught exception while processing segment, skip merging the remaining results blocks and directly return
- // the exception
- mergedBlock = blockToMerge;
- break;
- }
- if (mergedBlock == null) {
- mergedBlock = blockToMerge;
- } else {
- if (blockToMerge != LAST_RESULTS_BLOCK) {
- mergeResultsBlocks(mergedBlock, blockToMerge);
- }
- }
- numBlocksMerged++;
-
- // Update the boundary value if enough rows are collected
- PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
- if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
- assert selectionResult.peek() != null;
- globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
- }
- }
- } catch (Exception e) {
- LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
- mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
- } finally {
- // Cancel all ongoing jobs
- for (Future future : futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
- }
- // Deregister the main thread and wait for all threads done
- phaser.awaitAdvance(phaser.arriveAndDeregister());
- }
-
- CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
- return mergedBlock;
- }
-
- private static class MinMaxValueContext {
- final SelectionOrderByOperator _operator;
- final Comparable _minValue;
- final Comparable _maxValue;
-
- MinMaxValueContext(SelectionOrderByOperator operator, String column) {
- _operator = operator;
- DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
- _minValue = dataSourceMetadata.getMinValue();
- _maxValue = dataSourceMetadata.getMaxValue();
- }
+ return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, _executorService, _endTimeMs,
+ minMaxValueContexts).getNextBlock();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org