You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/07/23 17:35:06 UTC
[incubator-pinot] branch master updated: Early termination for
combining selection order-by results (#5686)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 174eb4b Early termination for combining selection order-by results (#5686)
174eb4b is described below
commit 174eb4b99c1ccc0645e1ba47eb3f26a1c816efb5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jul 23 10:34:56 2020 -0700
Early termination for combining selection order-by results (#5686)
Split `CombineOperator` into 3 different combine operators:
- `SelectionOnlyCombineOperator`
- `SelectionOrderByCombineOperator`
- `AggregationOnlyOrderByCombineOperator`
(For aggregation-group by, rename the combine operator to `GroupByCombineOperator` and `GroupByOrderByCombineOperator`)
Re-implement the combine operator logic in `BaseCombineOperator` for the early-termination enhancement:
- Worker threads no longer perform the result merge, but insert the results block from each segment to the blocking queue
- Worker threads can early-terminate if the results block for one segment can satisfy the query
- Main thread will merge the results blocks from the worker threads, and early-terminate if the merged results block can satisfy the query
For `SelectionOnlyCombineOperator`:
- Only process 1 segment for `LIMIT 0`
- Early termination when enough rows are collected
For `SelectionOrderByCombineOperator`:
- If the first order-by expression is an identifier (column), sort segments by the column min/max value and early terminate when result for the segment must not be in the final result.
---
.../pinot/core/operator/CombineOperator.java | 226 -------------
.../pinot/core/operator/ExecutionStatistics.java | 46 +--
.../operator/blocks/IntermediateResultsBlock.java | 52 ++-
.../combine/AggregationOnlyCombineOperator.java | 58 ++++
.../core/operator/combine/BaseCombineOperator.java | 185 +++++++++++
.../operator/combine/CombineOperatorUtils.java | 74 +++++
.../GroupByCombineOperator.java} | 27 +-
.../GroupByOrderByCombineOperator.java} | 30 +-
.../combine/SelectionOnlyCombineOperator.java | 95 ++++++
.../combine/SelectionOrderByCombineOperator.java | 356 +++++++++++++++++++++
.../operator/query/SelectionOrderByOperator.java | 18 +-
.../apache/pinot/core/plan/CombinePlanNode.java | 36 ++-
.../pinot/core/query/reduce/CombineService.java | 145 ---------
.../{ => combine}/CombineSlowOperatorsTest.java | 36 ++-
.../combine/SelectionCombineOperatorTest.java | 243 ++++++++++++++
.../pinot/queries/BaseSingleValueQueriesTest.java | 12 +-
.../queries/SelectionOnlyEarlyTerminationTest.java | 124 -------
17 files changed, 1139 insertions(+), 624 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
deleted file mode 100644
index accd0f0..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.query.exception.EarlyTerminationException;
-import org.apache.pinot.core.query.reduce.CombineService;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.util.trace.TraceCallable;
-import org.apache.pinot.core.util.trace.TraceRunnable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>CombineOperator</code> class is the operator to combine selection results and aggregation only results.
- */
-@SuppressWarnings("rawtypes")
-public class CombineOperator extends BaseOperator<IntermediateResultsBlock> {
- private static final Logger LOGGER = LoggerFactory.getLogger(CombineOperator.class);
- private static final String OPERATOR_NAME = "CombineOperator";
-
- // Use at most 10 or half of the processors threads for each query.
- // If there are less than 2 processors, use 1 thread.
- // Runtime.getRuntime().availableProcessors() may return value < 2 in container based environment, e.g. Kubernetes.
- public static final int MAX_NUM_THREADS_PER_QUERY =
- Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2));
-
- private final List<Operator> _operators;
- private final QueryContext _queryContext;
- private final ExecutorService _executorService;
- private final long _timeOutMs;
-
- public CombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
- long timeOutMs) {
- _operators = operators;
- _queryContext = queryContext;
- _executorService = executorService;
- _timeOutMs = timeOutMs;
- }
-
- @Override
- protected IntermediateResultsBlock getNextBlock() {
- long startTimeMs = System.currentTimeMillis();
- long endTimeMs = startTimeMs + _timeOutMs;
- int numOperators = _operators.size();
- // Try to use all MAX_NUM_THREADS_PER_QUERY threads for the query, but ensure each thread has at least one operator
- int numThreads = Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY);
-
- // We use a BlockingQueue to store the results for each operator group, and track if all operator groups are
- // finished by the query timeout, and cancel the unfinished futures (try to interrupt the execution if it already
- // started).
- // Besides the BlockingQueue, we also use a Phaser to ensure all the Futures are done (not scheduled, finished or
- // interrupted) before the main thread returns. We need to ensure no execution left before the main thread returning
- // because the main thread holds the reference to the segments, and if the segments are deleted/refreshed, the
- // segments can be released after the main thread returns, which would lead to undefined behavior (even JVM crash)
- // when executing queries against them.
- BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numThreads);
- Phaser phaser = new Phaser(1);
-
- // Submit operator group execution jobs
- Future[] futures = new Future[numThreads];
- for (int i = 0; i < numThreads; i++) {
- int index = i;
- futures[i] = _executorService.submit(new TraceRunnable() {
- @Override
- public void runJob() {
- try {
- // Register the thread to the phaser.
- // If the phaser is terminated (returning negative value) when trying to register the thread, that means the
- // query execution has timed out, and the main thread has deregistered itself and returned the result.
- // Directly return as no execution result will be taken.
- if (phaser.register() < 0) {
- return;
- }
-
- IntermediateResultsBlock mergedBlock = (IntermediateResultsBlock) _operators.get(index).nextBlock();
- for (int i = index + numThreads; i < numOperators; i += numThreads) {
- if (isQuerySatisfied(_queryContext, mergedBlock)) {
- break;
- }
- IntermediateResultsBlock blockToMerge = (IntermediateResultsBlock) _operators.get(i).nextBlock();
- try {
- CombineService.mergeTwoBlocks(_queryContext, mergedBlock, blockToMerge);
- } catch (Exception e) {
- LOGGER.error("Caught exception while merging two blocks (step 1).", e);
- mergedBlock
- .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e));
- }
- }
- blockingQueue.offer(mergedBlock);
- } catch (EarlyTerminationException e) {
- // Early-terminated because query times out or is already satisfied
- } catch (Exception e) {
- LOGGER.error("Caught exception while executing query.", e);
- blockingQueue.offer(new IntermediateResultsBlock(e));
- } finally {
- phaser.arriveAndDeregister();
- }
- }
- });
- }
-
- // Submit operator groups merge job
- Future<IntermediateResultsBlock> mergedBlockFuture =
- _executorService.submit(new TraceCallable<IntermediateResultsBlock>() {
- @Override
- public IntermediateResultsBlock callJob()
- throws Exception {
- IntermediateResultsBlock mergedBlock =
- blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- if (mergedBlock == null) {
- throw new TimeoutException("Timed out while polling result from first thread");
- }
- int numMergedBlocks = 1;
- while (numMergedBlocks < numThreads) {
- if (isQuerySatisfied(_queryContext, mergedBlock)) {
- break;
- }
- IntermediateResultsBlock blockToMerge =
- blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- if (blockToMerge == null) {
- throw new TimeoutException("Timed out while polling result from thread: " + numMergedBlocks);
- }
- try {
- CombineService.mergeTwoBlocks(_queryContext, mergedBlock, blockToMerge);
- } catch (Exception e) {
- LOGGER.error("Caught exception while merging two blocks (step 2).", e);
- mergedBlock
- .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e));
- }
- numMergedBlocks++;
- }
- return mergedBlock;
- }
- });
-
- // Get merge results.
- IntermediateResultsBlock mergedBlock;
- try {
- mergedBlock = mergedBlockFuture.get(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOGGER.error("Caught InterruptedException. (queryContext = {})", _queryContext, e);
- mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.FUTURE_CALL_ERROR, e));
- } catch (ExecutionException e) {
- LOGGER.error("Caught ExecutionException. (queryContext = {})", _queryContext, e);
- mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e));
- } catch (TimeoutException e) {
- LOGGER.error("Caught TimeoutException. (queryContext = {})", _queryContext, e);
- mergedBlockFuture.cancel(true);
- mergedBlock =
- new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, e));
- } finally {
- // Cancel all ongoing jobs
- for (Future future : futures) {
- if (!future.isDone()) {
- future.cancel(true);
- }
- }
- // Deregister the main thread and wait for all threads done
- phaser.awaitAdvance(phaser.arriveAndDeregister());
- }
-
- // Update execution statistics.
- ExecutionStatistics executionStatistics = new ExecutionStatistics();
- for (Operator operator : _operators) {
- executionStatistics.merge(operator.getExecutionStatistics());
- }
- mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
- mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
- mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter());
- mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs());
- mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
- mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
-
- return mergedBlock;
- }
-
- /**
- * Returns {@code true} if the query is already satisfied with the IntermediateResultsBlock so that there is no need
- * to process more segments, {@code false} otherwise.
- * <p>For selection-only query, the query is satisfied when enough records are gathered.
- */
- private boolean isQuerySatisfied(QueryContext queryContext, IntermediateResultsBlock mergedBlock) {
- if (!QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getOrderByExpressions() == null) {
- // Selection-only
- Collection<Object[]> selectionResult = mergedBlock.getSelectionResult();
- return selectionResult != null && selectionResult.size() >= queryContext.getLimit();
- }
- return false;
- }
-
- @Override
- public String getOperatorName() {
- return OPERATOR_NAME;
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java
index 1c99d24..04c2665 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java
@@ -23,7 +23,7 @@ package org.apache.pinot.core.operator;
*/
public class ExecutionStatistics {
// The number of documents scanned post filtering.
- private long _numDocsScanned;
+ private final long _numDocsScanned;
// The number of entries (single value entry contains 1 value, multi-value entry may contain multiple values) scanned
// in the filtering phase of the query execution: could be larger than the total scanned doc num because of multiple
// filtering predicates and multi-value entry.
@@ -31,15 +31,13 @@ public class ExecutionStatistics {
// _numEntriesScannedInFilter counts values in a multi-value entry multiple times whereas in
// _numEntriesScannedPostFilter counts all values in a multi-value entry only once. We can add a new stats
// _numValuesScannedInFilter to replace _numEntriesScannedInFilter.
- private long _numEntriesScannedInFilter;
+ private final long _numEntriesScannedInFilter;
// Equal to numDocsScanned * numberProjectedColumns.
- private long _numEntriesScannedPostFilter;
- private long _numTotalDocs;
- private long _numSegmentsProcessed;
- private long _numSegmentsMatched;
+ private final long _numEntriesScannedPostFilter;
- public ExecutionStatistics() {
- }
+ // TODO: Remove _numTotalDocs because it is not execution stats, and it is not set from the operators because they
+ // don't cover the pruned segments
+ private final long _numTotalDocs;
public ExecutionStatistics(long numDocsScanned, long numEntriesScannedInFilter, long numEntriesScannedPostFilter,
long numTotalDocs) {
@@ -47,8 +45,6 @@ public class ExecutionStatistics {
_numEntriesScannedInFilter = numEntriesScannedInFilter;
_numEntriesScannedPostFilter = numEntriesScannedPostFilter;
_numTotalDocs = numTotalDocs;
- _numSegmentsProcessed = 1;
- _numSegmentsMatched = (numDocsScanned == 0) ? 0 : 1;
}
public long getNumDocsScanned() {
@@ -66,34 +62,4 @@ public class ExecutionStatistics {
public long getNumTotalDocs() {
return _numTotalDocs;
}
-
- public long getNumSegmentsProcessed() {
- return _numSegmentsProcessed;
- }
-
- public long getNumSegmentsMatched() {
- return _numSegmentsMatched;
- }
-
- /**
- * Merge another execution statistics into the current one.
- *
- * @param executionStatisticsToMerge execution statistics to merge.
- */
- public void merge(ExecutionStatistics executionStatisticsToMerge) {
- _numDocsScanned += executionStatisticsToMerge._numDocsScanned;
- _numEntriesScannedInFilter += executionStatisticsToMerge._numEntriesScannedInFilter;
- _numEntriesScannedPostFilter += executionStatisticsToMerge._numEntriesScannedPostFilter;
- _numTotalDocs += executionStatisticsToMerge._numTotalDocs;
- _numSegmentsProcessed += executionStatisticsToMerge._numSegmentsProcessed;
- _numSegmentsMatched += executionStatisticsToMerge._numSegmentsMatched;
- }
-
- @Override
- public String toString() {
- return "Execution Statistics:" + "\n numDocsScanned: " + _numDocsScanned + "\n numEntriesScannedInFilter: "
- + _numEntriesScannedInFilter + "\n numEntriesScannedPostFilter: " + _numEntriesScannedPostFilter
- + "\n numTotalDocs: " + _numTotalDocs + "\n numSegmentsProcessed: " + _numSegmentsProcessed
- + "\n numSegmentsMatched: " + _numSegmentsMatched;
- }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 63efe7c..d0eb2e2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.operator.blocks;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -60,8 +61,8 @@ public class IntermediateResultsBlock implements Block {
private long _numEntriesScannedInFilter;
private long _numEntriesScannedPostFilter;
private long _numTotalDocs;
- private long _numSegmentsProcessed;
- private long _numSegmentsMatched;
+ private int _numSegmentsProcessed;
+ private int _numSegmentsMatched;
private boolean _numGroupsLimitReached;
private Table _table;
@@ -198,19 +199,11 @@ public class IntermediateResultsBlock implements Block {
_numEntriesScannedPostFilter = numEntriesScannedPostFilter;
}
- public long getNumSegmentsProcessed() {
- return _numSegmentsProcessed;
- }
-
- public void setNumSegmentsProcessed(long numSegmentsProcessed) {
+ public void setNumSegmentsProcessed(int numSegmentsProcessed) {
_numSegmentsProcessed = numSegmentsProcessed;
}
- public long getNumSegmentsMatched() {
- return _numSegmentsMatched;
- }
-
- public void setNumSegmentsMatched(long numSegmentsMatched) {
+ public void setNumSegmentsMatched(int numSegmentsMatched) {
_numSegmentsMatched = numSegmentsMatched;
}
@@ -222,6 +215,41 @@ public class IntermediateResultsBlock implements Block {
_numGroupsLimitReached = numGroupsLimitReached;
}
+ @VisibleForTesting
+ public long getNumDocsScanned() {
+ return _numDocsScanned;
+ }
+
+ @VisibleForTesting
+ public long getNumEntriesScannedInFilter() {
+ return _numEntriesScannedInFilter;
+ }
+
+ @VisibleForTesting
+ public long getNumEntriesScannedPostFilter() {
+ return _numEntriesScannedPostFilter;
+ }
+
+ @VisibleForTesting
+ public int getNumSegmentsProcessed() {
+ return _numSegmentsProcessed;
+ }
+
+ @VisibleForTesting
+ public int getNumSegmentsMatched() {
+ return _numSegmentsMatched;
+ }
+
+ @VisibleForTesting
+ public long getNumTotalDocs() {
+ return _numTotalDocs;
+ }
+
+ @VisibleForTesting
+ public boolean isNumGroupsLimitReached() {
+ return _numGroupsLimitReached;
+ }
+
public DataTable getDataTable()
throws Exception {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
new file mode 100644
index 0000000..83a474e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Combine operator for aggregation only queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class AggregationOnlyCombineOperator extends BaseCombineOperator {
+ private static final String OPERATOR_NAME = "AggregationOnlyCombineOperator";
+
+ public AggregationOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
+ ExecutorService executorService, long timeOutMs) {
+ super(operators, queryContext, executorService, timeOutMs);
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+ AggregationFunction[] aggregationFunctions = mergedBlock.getAggregationFunctions();
+ List<Object> mergedResults = mergedBlock.getAggregationResult();
+ List<Object> resultsToMerge = blockToMerge.getAggregationResult();
+ assert aggregationFunctions != null && mergedResults != null && resultsToMerge != null;
+
+ int numAggregationFunctions = aggregationFunctions.length;
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ mergedResults.set(i, aggregationFunctions[i].merge(mergedResults.get(i), resultsToMerge.get(i)));
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
new file mode 100644
index 0000000..2d10a35
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base implementation of the combine operator.
+ * <p>Combine operator uses multiple worker threads to process segments in parallel, and uses the main thread to merge
+ * the results blocks from the processed segments. It can early-terminate the query to save the system resources if it
+ * detects that the merged results can already satisfy the query, or the query is already errored out or timed out.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class BaseCombineOperator extends BaseOperator<IntermediateResultsBlock> {
+ protected static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class);
+
+ protected final List<Operator> _operators;
+ protected final QueryContext _queryContext;
+ protected final ExecutorService _executorService;
+ protected final long _timeOutMs;
+
+ public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
+ long timeOutMs) {
+ _operators = operators;
+ _queryContext = queryContext;
+ _executorService = executorService;
+ _timeOutMs = timeOutMs;
+ }
+
+ @Override
+ protected IntermediateResultsBlock getNextBlock() {
+ long startTimeMs = System.currentTimeMillis();
+ long endTimeMs = startTimeMs + _timeOutMs;
+ int numOperators = _operators.size();
+ int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
+
+ // Use a BlockingQueue to store the per-segment result
+ BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
+ // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+ // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+ // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+ // behavior (even JVM crash) when processing queries against it.
+ Phaser phaser = new Phaser(1);
+
+ Future[] futures = new Future[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ int threadIndex = i;
+ futures[i] = _executorService.submit(new TraceRunnable() {
+ @Override
+ public void runJob() {
+ try {
+ // Register the thread to the phaser
+ // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+ // means the query execution has finished, and the main thread has deregistered itself and returned
+ // the result. Directly return as no execution result will be taken.
+ if (phaser.register() < 0) {
+ return;
+ }
+
+ for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
+ try {
+ IntermediateResultsBlock resultsBlock =
+ (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+ if (isQuerySatisfied(resultsBlock)) {
+ // Query is satisfied, skip processing the remaining segments
+ blockingQueue.offer(resultsBlock);
+ return;
+ } else {
+ blockingQueue.offer(resultsBlock);
+ }
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main thread)
+ return;
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining operators
+ LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
+ _queryContext, e);
+ blockingQueue.offer(new IntermediateResultsBlock(e));
+ return;
+ }
+ }
+ } finally {
+ phaser.arriveAndDeregister();
+ }
+ }
+ });
+ }
+
+ IntermediateResultsBlock mergedBlock = null;
+ try {
+ int numBlocksMerged = 0;
+ while (numBlocksMerged < numOperators) {
+ IntermediateResultsBlock blockToMerge =
+ blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (blockToMerge == null) {
+ // Query times out, skip merging the remaining results blocks
+ LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
+ _queryContext);
+ mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
+ break;
+ }
+ if (blockToMerge.getProcessingExceptions() != null) {
+ // Caught exception while processing segment, skip merging the remaining results blocks and directly return
+ // the exception
+ mergedBlock = blockToMerge;
+ break;
+ }
+ if (mergedBlock == null) {
+ mergedBlock = blockToMerge;
+ } else {
+ mergeResultsBlocks(mergedBlock, blockToMerge);
+ }
+ numBlocksMerged++;
+ if (isQuerySatisfied(mergedBlock)) {
+ // Query is satisfied, skip merging the remaining results blocks
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
+ mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
+ } finally {
+ // Cancel all ongoing jobs
+ for (Future future : futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ // Deregister the main thread and wait for all threads done
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
+ }
+
+ CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+ return mergedBlock;
+ }
+
+ /**
+ * Can be overridden for early termination.
+ */
+ protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) {
+ return false;
+ }
+
+ /**
+ * Merge an IntermediateResultsBlock into the main IntermediateResultsBlock.
+ * <p>NOTE: {@code blockToMerge} should contain the result for a segment without any exception. The errored segment
+ * result is already handled.
+ */
+ protected abstract void mergeResultsBlocks(IntermediateResultsBlock mergedBlock,
+ IntermediateResultsBlock blockToMerge);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
new file mode 100644
index 0000000..fd7737f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.List;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+
+
+@SuppressWarnings("rawtypes")
+public class CombineOperatorUtils {
+ private CombineOperatorUtils() {
+ }
+
+ /**
+ * Use at most 10 or half of the processors threads for each query. If there are less than 2 processors, use 1 thread.
+ * <p>NOTE: Runtime.getRuntime().availableProcessors() may return value < 2 in container based environment, e.g.
+ * Kubernetes.
+ */
+ public static final int MAX_NUM_THREADS_PER_QUERY =
+ Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2));
+
+ /**
+ * Returns the number of threads used to execute the query in parallel.
+ */
+ public static int getNumThreadsForQuery(int numOperators) {
+ return Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY);
+ }
+
+ /**
+ * Sets the execution statistics into the results block.
+ */
+ public static void setExecutionStatistics(IntermediateResultsBlock resultsBlock, List<Operator> operators) {
+ int numSegmentsProcessed = operators.size();
+ int numSegmentsMatched = 0;
+ long numDocsScanned = 0;
+ long numEntriesScannedInFilter = 0;
+ long numEntriesScannedPostFilter = 0;
+ long numTotalDocs = 0;
+ for (Operator operator : operators) {
+ ExecutionStatistics executionStatistics = operator.getExecutionStatistics();
+ if (executionStatistics.getNumDocsScanned() > 0) {
+ numSegmentsMatched++;
+ }
+ numDocsScanned += executionStatistics.getNumDocsScanned();
+ numEntriesScannedInFilter += executionStatistics.getNumEntriesScannedInFilter();
+ numEntriesScannedPostFilter += executionStatistics.getNumEntriesScannedPostFilter();
+ numTotalDocs += executionStatistics.getNumTotalDocs();
+ }
+ resultsBlock.setNumSegmentsProcessed(numSegmentsProcessed);
+ resultsBlock.setNumSegmentsMatched(numSegmentsMatched);
+ resultsBlock.setNumDocsScanned(numDocsScanned);
+ resultsBlock.setNumEntriesScannedInFilter(numEntriesScannedInFilter);
+ resultsBlock.setNumEntriesScannedPostFilter(numEntriesScannedPostFilter);
+ resultsBlock.setNumTotalDocs(numTotalDocs);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
similarity index 90%
rename from pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 5b6b9e6..476b459 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.combine;
import java.util.ArrayList;
import java.util.Iterator;
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
@@ -48,12 +49,15 @@ import org.slf4j.LoggerFactory;
/**
- * The <code>CombineGroupByOperator</code> class is the operator to combine aggregation group-by results.
+ * Combine operator for aggregation group-by queries with PQL semantic.
+ * TODO:
+ * - Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using all threads
+ * - Try to extend BaseCombineOperator to reduce duplicate code
*/
@SuppressWarnings("rawtypes")
-public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBlock> {
- private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOperator.class);
- private static final String OPERATOR_NAME = "CombineGroupByOperator";
+public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBlock> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupByCombineOperator.class);
+ private static final String OPERATOR_NAME = "GroupByCombineOperator";
// Use a higher limit for groups stored across segments. For most cases, most groups from each segment should be the
// same, thus the total number of groups across segments should be equal or slightly higher than the number of groups
@@ -69,7 +73,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
private final int _innerSegmentNumGroupsLimit;
private final int _interSegmentNumGroupsLimit;
- public CombineGroupByOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
+ public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
long timeOutMs, int innerSegmentNumGroupsLimit) {
_operators = operators;
_queryContext = queryContext;
@@ -208,16 +212,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc
}
// Set the execution statistics.
- ExecutionStatistics executionStatistics = new ExecutionStatistics();
- for (Operator operator : _operators) {
- executionStatistics.merge(operator.getExecutionStatistics());
- }
- mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
- mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
- mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter());
- mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
- mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
- mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs());
+ CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
// TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we
// are comparing number of groups across segments with the groups limit for each segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
similarity index 88%
rename from pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index f0001af..627e9f4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.combine;
import java.util.ArrayList;
import java.util.Iterator;
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
@@ -53,15 +54,15 @@ import org.slf4j.LoggerFactory;
/**
- * The <code>CombineGroupByOrderByOperator</code> class is the operator to combine aggregation results with group-by and order by.
+ * Combine operator for aggregation group-by queries with SQL semantic.
+ * TODO:
+ * - Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using all threads
+ * - Try to extend BaseCombineOperator to reduce duplicate code
*/
-// TODO: this class has a lot of duplication with {@link CombineGroupByOperator}.
-// These 2 classes can be combined into one
-// For the first iteration of Order By support, these will be separate
@SuppressWarnings("rawtypes")
-public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResultsBlock> {
- private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOrderByOperator.class);
- private static final String OPERATOR_NAME = "CombineGroupByOrderByOperator";
+public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResultsBlock> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);
+ private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator";
private final List<Operator> _operators;
private final QueryContext _queryContext;
@@ -72,7 +73,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
private DataSchema _dataSchema;
private ConcurrentIndexedTable _indexedTable;
- public CombineGroupByOrderByOperator(List<Operator> operators, QueryContext queryContext,
+ public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long timeOutMs) {
_operators = operators;
_queryContext = queryContext;
@@ -220,16 +221,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
}
// Set the execution statistics.
- ExecutionStatistics executionStatistics = new ExecutionStatistics();
- for (Operator operator : _operators) {
- executionStatistics.merge(operator.getExecutionStatistics());
- }
- mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
- mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
- mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter());
- mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
- mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
- mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs());
+ CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
if (_indexedTable.size() >= _indexedTableCapacity) {
mergedBlock.setNumGroupsLimitReached(true);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
new file mode 100644
index 0000000..07aad33
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+
+/**
+ * Combine operator for selection only queries.
+ * <p>For query with LIMIT 0, directly use main thread to process one segment to get the data schema of the query.
+ * <p>Query can be early-terminated when enough documents have been collected to fulfill the LIMIT requirement.
+ * <p>NOTE: Selection order-by query with LIMIT 0 is treated as selection only query.
+ */
+@SuppressWarnings("rawtypes")
+public class SelectionOnlyCombineOperator extends BaseCombineOperator {
+ private static final String OPERATOR_NAME = "SelectionOnlyCombineOperator";
+
+ private final int _numRowsToKeep;
+
+ public SelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
+ ExecutorService executorService, long timeOutMs) {
+ super(operators, queryContext, executorService, timeOutMs);
+ _numRowsToKeep = queryContext.getLimit();
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ protected IntermediateResultsBlock getNextBlock() {
+ // For LIMIT 0 query, only process one segment to get the data schema
+ if (_numRowsToKeep == 0) {
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(0).nextBlock();
+ CombineOperatorUtils.setExecutionStatistics(resultsBlock, _operators);
+ return resultsBlock;
+ }
+
+ return super.getNextBlock();
+ }
+
+ @Override
+ protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) {
+ Collection<Object[]> selectionResult = resultsBlock.getSelectionResult();
+ assert selectionResult != null;
+ return selectionResult.size() == _numRowsToKeep;
+ }
+
+ @Override
+ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+ DataSchema mergedDataSchema = mergedBlock.getDataSchema();
+ DataSchema dataSchemaToMerge = blockToMerge.getDataSchema();
+ assert mergedDataSchema != null && dataSchemaToMerge != null;
+ if (!mergedDataSchema.equals(dataSchemaToMerge)) {
+ String errorMessage = String
+ .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge",
+ mergedDataSchema, dataSchemaToMerge);
+ // NOTE: This is segment level log, so log at debug level to prevent flooding the log.
+ LOGGER.debug(errorMessage);
+ mergedBlock
+ .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
+ return;
+ }
+
+ Collection<Object[]> mergedRows = mergedBlock.getSelectionResult();
+ Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult();
+ assert mergedRows != null && rowsToMerge != null;
+ SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
new file mode 100644
index 0000000..af5b18e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+
+
+/**
+ * Combine operator for selection order-by queries.
+ * <p>When the first order-by expression is an identifier (column), skip processing the segments if possible based on
+ * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement.
+ * <ul>
+ * <li>1. Sort all the segments by the column min/max value</li>
+ * <li>2. Keep processing segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li>
+ * <li>3. Skip processing the segments that cannot add values to the final result</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SelectionOrderByCombineOperator extends BaseCombineOperator {
+ private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator";
+
+ // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
+ // special IntermediateResultsBlock into the BlockingQueue to awake the main thread
+ private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
+ new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
+ Collections.emptyList());
+
+ private final int _numRowsToKeep;
+
+ public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
+ ExecutorService executorService, long timeOutMs) {
+ super(operators, queryContext, executorService, timeOutMs);
+ _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ protected IntermediateResultsBlock getNextBlock() {
+ List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
+ assert orderByExpressions != null;
+ if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) {
+ return minMaxValueBasedCombine();
+ } else {
+ return super.getNextBlock();
+ }
+ }
+
+ private IntermediateResultsBlock minMaxValueBasedCombine() {
+ long startTimeMs = System.currentTimeMillis();
+ long endTimeMs = startTimeMs + _timeOutMs;
+
+ List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
+ assert orderByExpressions != null;
+ int numOrderByExpressions = orderByExpressions.size();
+ assert numOrderByExpressions > 0;
+ OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
+ assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER;
+ String firstOrderByColumn = firstOrderByExpression.getExpression().getIdentifier();
+ boolean asc = firstOrderByExpression.isAsc();
+
+ int numOperators = _operators.size();
+ List<MinMaxValueContext> minMaxValueContexts = new ArrayList<>(numOperators);
+ for (Operator operator : _operators) {
+ minMaxValueContexts.add(new MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn));
+ }
+ try {
+ if (asc) {
+ // For ascending order, sort on column min value in ascending order
+ minMaxValueContexts.sort((o1, o2) -> {
+ // Put segments without column min value in the front because we always need to process them
+ if (o1._minValue == null) {
+ return o2._minValue == null ? 0 : -1;
+ }
+ if (o2._minValue == null) {
+ return 1;
+ }
+ return o1._minValue.compareTo(o2._minValue);
+ });
+ } else {
+ // For descending order, sort on column max value in descending order
+ minMaxValueContexts.sort((o1, o2) -> {
+ // Put segments without column max value in the front because we always need to process them
+ if (o1._maxValue == null) {
+ return o2._maxValue == null ? 0 : -1;
+ }
+ if (o2._maxValue == null) {
+ return 1;
+ }
+ return o2._maxValue.compareTo(o1._maxValue);
+ });
+ }
+ } catch (Exception e) {
+ // Fall back to the default combine (process all segments) when segments have different data types for the first
+ // order-by column
+ LOGGER.warn("Segments have different data types for the first order-by column: {}, using the default combine",
+ firstOrderByColumn);
+ return super.getNextBlock();
+ }
+
+ int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
+ AtomicReference<Comparable> globalBoundaryValue = new AtomicReference<>();
+
+ // Use a BlockingQueue to store the per-segment result
+ BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators);
+ // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
+ AtomicInteger numOperatorsSkipped = new AtomicInteger();
+ // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
+ // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
+ // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
+ // behavior (even JVM crash) when processing queries against it.
+ Phaser phaser = new Phaser(1);
+
+ Future[] futures = new Future[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ int threadIndex = i;
+ futures[i] = _executorService.submit(new TraceRunnable() {
+ @Override
+ public void runJob() {
+ try {
+ // Register the thread to the phaser
+ // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that
+ // means the query execution has finished, and the main thread has deregistered itself and returned
+ // the result. Directly return as no execution result will be taken.
+ if (phaser.register() < 0) {
+ return;
+ }
+
+ // Keep a boundary value for the thread
+ // NOTE: The thread boundary value can be different from the global boundary value because thread boundary
+ // value is updated after processing the segment, while global boundary value is updated after the
+ // segment result is merged.
+ Comparable threadBoundaryValue = null;
+
+ for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) {
+ // Calculate the boundary value from global boundary and thread boundary
+ Comparable boundaryValue = globalBoundaryValue.get();
+ if (boundaryValue == null) {
+ boundaryValue = threadBoundaryValue;
+ } else {
+ if (threadBoundaryValue != null) {
+ if (asc) {
+ if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
+ boundaryValue = threadBoundaryValue;
+ }
+ } else {
+ if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
+ boundaryValue = threadBoundaryValue;
+ }
+ }
+ }
+ }
+
+ // Check if the segment can be skipped
+ MinMaxValueContext minMaxValueContext = minMaxValueContexts.get(operatorIndex);
+ if (boundaryValue != null) {
+ if (asc) {
+ // For ascending order, no need to process more segments if the column min value is larger than the
+ // boundary value, or is equal to the boundary value and the there is only one order-by expression
+ if (minMaxValueContext._minValue != null) {
+ int result = minMaxValueContext._minValue.compareTo(boundaryValue);
+ if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
+ numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
+ blockingQueue.offer(LAST_RESULTS_BLOCK);
+ return;
+ }
+ }
+ } else {
+ // For descending order, no need to process more segments if the column max value is smaller than the
+ // boundary value, or is equal to the boundary value and the there is only one order-by expression
+ if (minMaxValueContext._maxValue != null) {
+ int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
+ if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
+ numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads);
+ blockingQueue.offer(LAST_RESULTS_BLOCK);
+ return;
+ }
+ }
+ }
+ }
+
+ // Process the segment
+ try {
+ IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock();
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult();
+ if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+ // Segment result has enough rows, update the boundary value
+ assert selectionResult.peek() != null;
+ Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0];
+ if (boundaryValue == null) {
+ boundaryValue = segmentBoundaryValue;
+ } else {
+ if (asc) {
+ if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+ boundaryValue = segmentBoundaryValue;
+ }
+ } else {
+ if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+ boundaryValue = segmentBoundaryValue;
+ }
+ }
+ }
+ }
+ threadBoundaryValue = boundaryValue;
+ blockingQueue.offer(resultsBlock);
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main thread)
+ return;
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining operators
+ LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex,
+ _queryContext, e);
+ blockingQueue.offer(new IntermediateResultsBlock(e));
+ return;
+ }
+ }
+ } finally {
+ phaser.arriveAndDeregister();
+ }
+ }
+ });
+ }
+
+ IntermediateResultsBlock mergedBlock = null;
+ try {
+ int numBlocksMerged = 0;
+ while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) {
+ IntermediateResultsBlock blockToMerge =
+ blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (blockToMerge == null) {
+ // Query times out, skip merging the remaining results blocks
+ LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
+ _queryContext);
+ mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
+ break;
+ }
+ if (blockToMerge.getProcessingExceptions() != null) {
+ // Caught exception while processing segment, skip merging the remaining results blocks and directly return
+ // the exception
+ mergedBlock = blockToMerge;
+ break;
+ }
+ if (mergedBlock == null) {
+ mergedBlock = blockToMerge;
+ } else {
+ if (blockToMerge != LAST_RESULTS_BLOCK) {
+ mergeResultsBlocks(mergedBlock, blockToMerge);
+ }
+ }
+ numBlocksMerged++;
+
+ // Update the boundary value if enough rows are collected
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+ if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
+ assert selectionResult.peek() != null;
+ globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
+ mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
+ } finally {
+ // Cancel all ongoing jobs
+ for (Future future : futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ // Deregister the main thread and wait for all threads done
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
+ }
+
+ CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+ return mergedBlock;
+ }
+
+ private static class MinMaxValueContext {
+ final SelectionOrderByOperator _operator;
+ final Comparable _minValue;
+ final Comparable _maxValue;
+
+ MinMaxValueContext(SelectionOrderByOperator operator, String column) {
+ _operator = operator;
+ DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
+ _minValue = dataSourceMetadata.getMinValue();
+ _maxValue = dataSourceMetadata.getMaxValue();
+ }
+ }
+
+ @Override
+ protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+ DataSchema mergedDataSchema = mergedBlock.getDataSchema();
+ DataSchema dataSchemaToMerge = blockToMerge.getDataSchema();
+ assert mergedDataSchema != null && dataSchemaToMerge != null;
+ if (!mergedDataSchema.equals(dataSchemaToMerge)) {
+ String errorMessage = String
+ .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge",
+ mergedDataSchema, dataSchemaToMerge);
+ // NOTE: This is segment level log, so log at debug level to prevent flooding the log.
+ LOGGER.debug(errorMessage);
+ mergedBlock
+ .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
+ return;
+ }
+
+ PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult();
+ Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult();
+ assert mergedRows != null && rowsToMerge != null;
+ SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index e13ef48..637672c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -166,6 +166,10 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
};
}
+ public IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
@Override
protected IntermediateResultsBlock getNextBlock() {
if (_expressions.size() == _orderByExpressions.size()) {
@@ -183,6 +187,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
// Fetch all the expressions and insert them into the priority queue
BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+ int numColumnsProjected = _transformOperator.getNumColumnsProjected();
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
for (int i = 0; i < numExpressions; i++) {
@@ -191,12 +196,12 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
}
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
- _numDocsScanned += numDocsFetched;
for (int i = 0; i < numDocsFetched; i++) {
SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep);
}
+ _numDocsScanned += numDocsFetched;
+ _numEntriesScannedPostFilter += numDocsFetched * numColumnsProjected;
}
- _numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected();
// Create the data schema
String[] columnNames = new String[numExpressions];
@@ -221,6 +226,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
// Fetch the order-by expressions and docIds and insert them into the priority queue
BlockValSet[] blockValSets = new BlockValSet[numOrderByExpressions + 1];
+ int numColumnsProjected = _transformOperator.getNumColumnsProjected();
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
for (int i = 0; i < numOrderByExpressions; i++) {
@@ -230,7 +236,6 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
blockValSets[numOrderByExpressions] = transformBlock.getBlockValueSet(BuiltInVirtualColumn.DOCID);
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
- _numDocsScanned += numDocsFetched;
for (int i = 0; i < numDocsFetched; i++) {
// NOTE: We pre-allocate the complete row so that we can fill up the non-order-by output expression values later
// without creating extra rows or re-constructing the priority queue. We can change the values in-place
@@ -239,6 +244,8 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
blockValueFetcher.getRow(i, row, 0);
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
+ _numDocsScanned += numDocsFetched;
+ _numEntriesScannedPostFilter += numDocsFetched * numColumnsProjected;
}
// Copy the rows (shallow copy so that any modification will also be reflected to the priority queue) into a list,
@@ -261,6 +268,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
for (ExpressionContext expressionContext : nonOrderByExpressions) {
expressionContext.getColumns(columns);
}
+ int numColumns = columns.size();
Map<String, DataSource> dataSourceMap = new HashMap<>();
for (String column : columns) {
dataSourceMap.put(column, _indexSegment.getDataSource(column));
@@ -283,11 +291,9 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
for (int i = 0; i < numDocsFetched; i++) {
blockValueFetcher.getRow(i, rowList.get(rowBaseId + i), numOrderByExpressions);
}
+ _numEntriesScannedPostFilter += numDocsFetched * numColumns;
rowBaseId += numDocsFetched;
}
- _numEntriesScannedPostFilter =
- (long) _numDocsScanned * _transformOperator.getNumColumnsProjected() + (long) numRows * transformOperator
- .getNumColumnsProjected();
// Create the data schema
String[] columnNames = new String[numExpressions];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 71a40b3..4f01fea 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -26,10 +26,12 @@ import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.CombineGroupByOperator;
-import org.apache.pinot.core.operator.CombineGroupByOrderByOperator;
-import org.apache.pinot.core.operator.CombineOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.combine.AggregationOnlyCombineOperator;
+import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator;
+import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator;
import org.apache.pinot.core.query.exception.BadQueryRequestException;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
@@ -158,18 +160,26 @@ public class CombinePlanNode implements PlanNode {
}
}
- // TODO: use the same combine operator for both aggregation and selection query.
- if (QueryContextUtils.isAggregationQuery(_queryContext) && _queryContext.getGroupByExpressions() != null) {
- // Aggregation group-by query
- QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions());
- // new Combine operator only when GROUP_BY_MODE explicitly set to SQL
- if (queryOptions.isGroupByModeSQL()) {
- return new CombineGroupByOrderByOperator(operators, _queryContext, _executorService, _timeOutMs);
+ if (QueryContextUtils.isAggregationQuery(_queryContext)) {
+ if (_queryContext.getGroupByExpressions() == null) {
+ // Aggregation only
+ return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ } else {
+ // Aggregation group-by
+ QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions());
+ if (queryOptions.isGroupByModeSQL()) {
+ return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ }
+ return new GroupByCombineOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit);
}
- return new CombineGroupByOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit);
} else {
- // Selection or aggregation only query
- return new CombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) {
+ // Selection only
+ return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ } else {
+ // Selection order-by
+ return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ }
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java
deleted file mode 100644
index 80cf40e..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.reduce;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.PriorityQueue;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>CombineService</code> class provides the utility methods to combine {@link IntermediateResultsBlock}s.
- */
-@SuppressWarnings({"ConstantConditions", "rawtypes", "unchecked"})
-public class CombineService {
- private CombineService() {
- }
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CombineService.class);
-
- public static void mergeTwoBlocks(QueryContext queryContext, IntermediateResultsBlock mergedBlock,
- IntermediateResultsBlock blockToMerge) {
- // Combine processing exceptions.
- List<ProcessingException> mergedProcessingExceptions = mergedBlock.getProcessingExceptions();
- List<ProcessingException> processingExceptionsToMerge = blockToMerge.getProcessingExceptions();
- if (mergedProcessingExceptions == null) {
- mergedBlock.setProcessingExceptions(processingExceptionsToMerge);
- } else if (processingExceptionsToMerge != null) {
- mergedProcessingExceptions.addAll(processingExceptionsToMerge);
- }
-
- // Combine result.
- if (QueryContextUtils.isAggregationQuery(queryContext)) {
- // NOTE: Aggregation group-by queries should not reach here.
- assert queryContext.getGroupByExpressions() == null;
-
- // Combine aggregation-only result.
- // Might be null if caught exception during query execution.
- List<Object> aggregationResultToMerge = blockToMerge.getAggregationResult();
- if (aggregationResultToMerge == null) {
- // No data in block to merge.
- return;
- }
-
- AggregationFunction[] mergedAggregationFunctions = mergedBlock.getAggregationFunctions();
- if (mergedAggregationFunctions == null) {
- // No data in merged block.
- mergedBlock.setAggregationFunctions(blockToMerge.getAggregationFunctions());
- mergedBlock.setAggregationResults(aggregationResultToMerge);
- } else {
- // Merge two blocks.
- List<Object> mergedAggregationResult = mergedBlock.getAggregationResult();
- int numAggregationFunctions = mergedAggregationFunctions.length;
- for (int i = 0; i < numAggregationFunctions; i++) {
- mergedAggregationResult.set(i,
- mergedAggregationFunctions[i].merge(mergedAggregationResult.get(i), aggregationResultToMerge.get(i)));
- }
- }
- } else {
- // Combine selection result.
-
- // Data schema will be null if exceptions caught during query processing.
- // Result set size will be zero if no row matches the predicate.
- DataSchema mergedBlockSchema = mergedBlock.getDataSchema();
- DataSchema blockToMergeSchema = blockToMerge.getDataSchema();
- Collection<Object[]> mergedBlockResultSet = mergedBlock.getSelectionResult();
- Collection<Object[]> blockToMergeResultSet = blockToMerge.getSelectionResult();
-
- if (mergedBlockSchema == null || mergedBlockResultSet.size() == 0) {
- // No data in merged block.
-
- // If block to merge schema is not null, set its data schema and result to the merged block.
- if (blockToMergeSchema != null) {
- mergedBlock.setDataSchema(blockToMergeSchema);
- mergedBlock.setSelectionResult(blockToMergeResultSet);
- }
- } else {
- // Some data in merged block.
-
- boolean isSelectionOrderBy = queryContext.getOrderByExpressions() != null;
- int limit = queryContext.getLimit();
-
- // No need to merge if already got enough rows for selection only.
- if (!isSelectionOrderBy && mergedBlockResultSet.size() == limit) {
- return;
- }
-
- // Merge only if there are data in block to merge.
- if (blockToMergeSchema != null && blockToMergeResultSet.size() > 0) {
- if (mergedBlockSchema.isTypeCompatibleWith(blockToMergeSchema)) {
- // Two blocks are mergeable.
-
- // Upgrade the merged block schema if necessary.
- mergedBlockSchema.upgradeToCover(blockToMergeSchema);
-
- // Merge two blocks.
- if (isSelectionOrderBy) {
- // Combine selection order-by.
- SelectionOperatorUtils
- .mergeWithOrdering((PriorityQueue<Object[]>) mergedBlockResultSet, blockToMergeResultSet,
- queryContext.getOffset() + limit);
- } else {
- // Combine selection only.
- SelectionOperatorUtils.mergeWithoutOrdering(mergedBlockResultSet, blockToMergeResultSet, limit);
- }
- mergedBlock.setSelectionResult(mergedBlockResultSet);
- } else {
- // Two blocks are not mergeable.
-
- String errorMessage = "Data schema inconsistency between merged block schema: " + mergedBlockSchema
- + " and block to merge schema: " + blockToMergeSchema + ", drop block to merge";
- LOGGER.info(errorMessage);
- mergedBlock.addToProcessingExceptions(
- QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
- }
- }
- }
- }
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
similarity index 79%
rename from pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
index 3a670fe..bfbb6c4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.combine;
import java.util.ArrayList;
import java.util.List;
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.exception.EarlyTerminationException;
@@ -60,30 +62,40 @@ public class CombineSlowOperatorsTest {
}
@Test
- public void testCombineOperator() {
+ public void testSelectionOnlyCombineOperator() {
List<Operator> operators = getOperators();
- CombineOperator combineOperator =
- new CombineOperator(operators, QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"),
- _executorService, TIMEOUT_MS);
+ SelectionOnlyCombineOperator combineOperator = new SelectionOnlyCombineOperator(operators,
+ QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"), _executorService, TIMEOUT_MS);
+ testCombineOperator(operators, combineOperator);
+ }
+
+ // NOTE: Skip the test for SelectionOrderByCombineOperator because it requires SelectionOrderByOperator for the early
+ // termination optimization.
+
+ @Test
+ public void testAggregationOnlyCombineOperator() {
+ List<Operator> operators = getOperators();
+ AggregationOnlyCombineOperator combineOperator = new AggregationOnlyCombineOperator(operators,
+ QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table"), _executorService, TIMEOUT_MS);
testCombineOperator(operators, combineOperator);
}
@Test
- public void testCombineGroupByOperator() {
+ public void testGroupByCombineOperator() {
List<Operator> operators = getOperators();
- CombineGroupByOperator combineGroupByOperator = new CombineGroupByOperator(operators,
+ GroupByCombineOperator combineOperator = new GroupByCombineOperator(operators,
QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table GROUP BY column"),
_executorService, TIMEOUT_MS, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
- testCombineOperator(operators, combineGroupByOperator);
+ testCombineOperator(operators, combineOperator);
}
@Test
- public void testCombineGroupByOrderByOperator() {
+ public void testGroupByOrderByCombineOperator() {
List<Operator> operators = getOperators();
- CombineGroupByOrderByOperator combineGroupByOrderByOperator = new CombineGroupByOrderByOperator(operators,
+ GroupByOrderByCombineOperator combineOperator = new GroupByOrderByCombineOperator(operators,
QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table GROUP BY column"),
_executorService, TIMEOUT_MS);
- testCombineOperator(operators, combineGroupByOrderByOperator);
+ testCombineOperator(operators, combineOperator);
}
/**
@@ -151,7 +163,7 @@ public class CombineSlowOperatorsTest {
@Override
public ExecutionStatistics getExecutionStatistics() {
- return new ExecutionStatistics();
+ return new ExecutionStatistics(0, 0, 0, 0);
}
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
new file mode 100644
index 0000000..ac4ecf5
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.plan.CombinePlanNode;
+import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Test for {@link SelectionOnlyCombineOperator} and {@link SelectionOrderByCombineOperator}.
+ */
+public class SelectionCombineOperatorTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SelectionCombineEarlyTerminationTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME_PREFIX = "testSegment_";
+
+ // Create (MAX_NUM_THREADS_PER_QUERY * 2) segments so that each thread needs to process 2 segments
+ private static final int NUM_SEGMENTS = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 2;
+ private static final int NUM_RECORDS_PER_SEGMENT = 100;
+
+ private static final String INT_COLUMN = "intColumn";
+ private static final TableConfig TABLE_CONFIG =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ private static final Schema SCHEMA =
+ new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build();
+
+ private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
+ private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
+
+ private List<IndexSegment> _indexSegments;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ _indexSegments = new ArrayList<>(NUM_SEGMENTS);
+ for (int i = 0; i < NUM_SEGMENTS; i++) {
+ _indexSegments.add(createSegment(i));
+ }
+ }
+
+ private IndexSegment createSegment(int index)
+ throws Exception {
+ int baseValue = index * NUM_RECORDS_PER_SEGMENT / 2;
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS_PER_SEGMENT);
+ for (int i = 0; i < NUM_RECORDS_PER_SEGMENT; i++) {
+ GenericRow record = new GenericRow();
+ record.putValue(INT_COLUMN, baseValue + i);
+ records.add(record);
+ }
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ String segmentName = SEGMENT_NAME_PREFIX + index;
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ return ImmutableSegmentLoader.load(new File(TEMP_DIR, segmentName), ReadMode.mmap);
+ }
+
+ @Test
+ public void testSelectionLimit0() {
+ IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable LIMIT 0");
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ assertNotNull(combineResult.getSelectionResult());
+ assertTrue(combineResult.getSelectionResult().isEmpty());
+ assertEquals(combineResult.getNumDocsScanned(), 0);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(), 0);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ assertEquals(combineResult.getNumSegmentsMatched(), 0);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ }
+
+ @Test
+ public void testSelectionOnly() {
+ IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable");
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ assertNotNull(combineResult.getSelectionResult());
+ assertEquals(combineResult.getSelectionResult().size(), 10);
+ // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
+ // segment.
+ long numDocsScanned = combineResult.getNumDocsScanned();
+ assertTrue(numDocsScanned >= 10 && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ int numSegmentsMatched = combineResult.getNumSegmentsMatched();
+ assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+
+ combineResult = getCombineResult("SELECT * FROM testTable LIMIT 10000");
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ assertNotNull(combineResult.getSelectionResult());
+ assertEquals(combineResult.getSelectionResult().size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ // Should not early-terminate
+ numDocsScanned = combineResult.getNumDocsScanned();
+ assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ }
+
+ @Test
+ public void testSelectionOrderBy() {
+ IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn");
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult();
+ assertNotNull(selectionResult);
+ assertEquals(selectionResult.size(), 10);
+ int expectedValue = 9;
+ while (!selectionResult.isEmpty()) {
+ assertEquals((int) selectionResult.poll()[0], expectedValue--);
+ }
+ // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
+ // segment.
+ long numDocsScanned = combineResult.getNumDocsScanned();
+ assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
+ && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ int numSegmentsMatched = combineResult.getNumSegmentsMatched();
+ assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+
+ combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC");
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult();
+ assertNotNull(selectionResult);
+ assertEquals(selectionResult.size(), 10);
+ expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40;
+ while (!selectionResult.isEmpty()) {
+ assertEquals((int) selectionResult.poll()[0], expectedValue++);
+ }
+ // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
+ // segment.
+ numDocsScanned = combineResult.getNumDocsScanned();
+ assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
+ && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ numSegmentsMatched = combineResult.getNumSegmentsMatched();
+ assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+
+ combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult();
+ assertNotNull(selectionResult);
+ assertEquals(selectionResult.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ // Should not early-terminate
+ numDocsScanned = combineResult.getNumDocsScanned();
+ assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ }
+
+ private IntermediateResultsBlock getCombineResult(String query) {
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromPQL(query);
+ List<PlanNode> planNodes = new ArrayList<>(NUM_SEGMENTS);
+ for (IndexSegment indexSegment : _indexSegments) {
+ planNodes.add(PLAN_MAKER.makeSegmentPlanNode(indexSegment, queryContext));
+ }
+ CombinePlanNode combinePlanNode =
+ new CombinePlanNode(planNodes, queryContext, EXECUTOR, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ return combinePlanNode.run().nextBlock();
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ for (IndexSegment indexSegment : _indexSegments) {
+ indexSegment.destroy();
+ }
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
index 4ec7fbc..ff32e25 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.queries;
import java.io.File;
import java.net.URL;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -68,7 +67,6 @@ public abstract class BaseSingleValueQueriesTest extends BaseQueriesTest {
private static final String AVRO_DATA = "data" + File.separator + "test_data-sv.avro";
private static final String SEGMENT_NAME = "testTable_126164076_167572854";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "SingleValueQueriesTest");
- private static final int NUM_SEGMENTS = 2;
// Hard-coded query filter.
private static final String QUERY_FILTER =
@@ -127,15 +125,7 @@ public abstract class BaseSingleValueQueriesTest extends BaseQueriesTest {
throws Exception {
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
_indexSegment = immutableSegment;
- int numSegments = getNumSegments();
- _indexSegments = new ArrayList<>(numSegments);
- for (int i = 0; i < numSegments; i++) {
- _indexSegments.add(immutableSegment);
- }
- }
-
- protected int getNumSegments() {
- return NUM_SEGMENTS;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
@AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
deleted file mode 100644
index bca78a0..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.queries;
-
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.core.operator.CombineOperator;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * Early termination test for selection-only queries.
- */
-public class SelectionOnlyEarlyTerminationTest extends BaseSingleValueQueriesTest {
- private static final int NUM_DOCS_PER_SEGMENT = 30000;
- private static final int NUM_SERVERS = 2;
-
- /**
- * In order to ensure each thread is executing more than 1 segment, this test is against
- * (2 * MAX_NUM_THREADS_PER_QUERY) segments per server.
- */
- @Override
- protected int getNumSegments() {
- return CombineOperator.MAX_NUM_THREADS_PER_QUERY * 2;
- }
-
- /**
- * With early termination, selection-only query is scheduled with {@link CombineOperator#MAX_NUM_THREADS_PER_QUERY}
- * threads per server, and the total number of segments matched (segments with non-zero documents scanned) should be
- * the same as the total number of threads for each server.
- */
- @Test
- public void testSelectOnlyQuery() {
- int numThreadsPerServer = CombineOperator.MAX_NUM_THREADS_PER_QUERY;
- int numSegmentsPerServer = getNumSegments();
-
- // LIMIT = 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480
- for (int limit = 5; limit < NUM_DOCS_PER_SEGMENT; limit *= 2) {
- String query = String.format("SELECT column1, column7, column9, column6 FROM testTable LIMIT %d", limit);
- int numColumnsInSelection = 4;
- BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
- assertNotNull(brokerResponse.getSelectionResults());
- assertNull(brokerResponse.getResultTable());
- assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
- // NOTE: 'numSegmentsMatched' and 'numDocsScanned' could be in a range because when the CombineOperator second
- // phase merge early terminates, the operators might not finish scanning the documents
- long numSegmentsMatched = brokerResponse.getNumSegmentsMatched();
- assertTrue(numSegmentsMatched >= NUM_SERVERS && numSegmentsMatched <= numThreadsPerServer * NUM_SERVERS);
- long numDocsScanned = brokerResponse.getNumDocsScanned();
- assertTrue(numDocsScanned >= NUM_SERVERS * limit && numDocsScanned <= numThreadsPerServer * NUM_SERVERS * limit);
- assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
- assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), numDocsScanned * numColumnsInSelection);
- // Total number of documents should not be affected by early-termination
- assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-
- brokerResponse = getBrokerResponseForSqlQuery(query);
- assertNull(brokerResponse.getSelectionResults());
- assertNotNull(brokerResponse.getResultTable());
- assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
- // NOTE: 'numSegmentsMatched' and 'numDocsScanned' could be in a range because when the CombineOperator second
- // phase merge early terminates, the operators might not finish scanning the documents
- numSegmentsMatched = brokerResponse.getNumSegmentsMatched();
- assertTrue(numSegmentsMatched >= NUM_SERVERS && numSegmentsMatched <= numThreadsPerServer * NUM_SERVERS);
- numDocsScanned = brokerResponse.getNumDocsScanned();
- assertTrue(numDocsScanned >= NUM_SERVERS * limit && numDocsScanned <= numThreadsPerServer * NUM_SERVERS * limit);
- assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
- assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), numDocsScanned * numColumnsInSelection);
- // Total number of documents should not be affected by early-termination
- assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
- }
- }
-
- /**
- * Without early termination, selection order-by query should hit all segments.
- */
- @Test
- public void testSelectWithOrderByQuery() {
- int numSegmentsPerServer = getNumSegments();
- String query = "SELECT column11, column18, column1 FROM testTable ORDER BY column11";
- BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
- assertNotNull(brokerResponse.getSelectionResults());
- assertNull(brokerResponse.getResultTable());
- assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
- assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer * NUM_SERVERS);
- assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
- assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
- // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 non-order-by columns) per segment
- assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
- brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * NUM_SERVERS);
- assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
-
- brokerResponse = getBrokerResponseForSqlQuery(query);
- assertNull(brokerResponse.getSelectionResults());
- assertNotNull(brokerResponse.getResultTable());
- assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS);
- assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer * NUM_SERVERS);
- assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
- assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
- // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 non-order-by columns) per segment
- assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
- brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * NUM_SERVERS);
- assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org