You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/08 21:32:27 UTC
[incubator-pinot] branch master updated: Use query timeout for
planning phase (#5990)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cd35332 Use query timeout for planning phase (#5990)
cd35332 is described below
commit cd35332a51c0c4997e8fdffca0dad9d090d0cf66
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 8 14:32:08 2020 -0700
Use query timeout for planning phase (#5990)
Currently the `CombinePlanNode` is using fixed 10 seconds as the timeout for the multi-threaded query planning, and the query planning time is not counted into the timeout for the query.
This PR:
- Replaces the fixed 10 seconds timeout with the query timeout for the query planing phase
- Count the query planing time into the query timeout
---
.../combine/AggregationOnlyCombineOperator.java | 4 ++--
.../core/operator/combine/BaseCombineOperator.java | 10 ++++-----
.../operator/combine/GroupByCombineOperator.java | 11 +++++-----
.../combine/GroupByOrderByCombineOperator.java | 11 +++++-----
.../combine/SelectionOnlyCombineOperator.java | 4 ++--
.../combine/SelectionOrderByCombineOperator.java | 9 +++-----
.../apache/pinot/core/plan/CombinePlanNode.java | 25 +++++++++-------------
.../core/plan/maker/InstancePlanMakerImplV2.java | 4 ++--
.../apache/pinot/core/plan/maker/PlanMaker.java | 2 +-
.../query/executor/ServerQueryExecutorV1Impl.java | 10 ++++-----
.../combine/SelectionCombineOperatorTest.java | 6 ++++--
.../pinot/core/plan/CombinePlanNodeTest.java | 16 ++++++++------
.../org/apache/pinot/queries/BaseQueriesTest.java | 4 ++--
.../apache/pinot/queries/DistinctQueriesTest.java | 8 +++----
14 files changed, 60 insertions(+), 64 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
index 83a474e..c3ec21b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
@@ -34,8 +34,8 @@ public class AggregationOnlyCombineOperator extends BaseCombineOperator {
private static final String OPERATOR_NAME = "AggregationOnlyCombineOperator";
public AggregationOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
- ExecutorService executorService, long timeOutMs) {
- super(operators, queryContext, executorService, timeOutMs);
+ ExecutorService executorService, long endTimeMs) {
+ super(operators, queryContext, executorService, endTimeMs);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 2d10a35..7e60844 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -50,20 +50,18 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
protected final List<Operator> _operators;
protected final QueryContext _queryContext;
protected final ExecutorService _executorService;
- protected final long _timeOutMs;
+ protected final long _endTimeMs;
public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
- long timeOutMs) {
+ long endTimeMs) {
_operators = operators;
_queryContext = queryContext;
_executorService = executorService;
- _timeOutMs = timeOutMs;
+ _endTimeMs = endTimeMs;
}
@Override
protected IntermediateResultsBlock getNextBlock() {
- long startTimeMs = System.currentTimeMillis();
- long endTimeMs = startTimeMs + _timeOutMs;
int numOperators = _operators.size();
int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
@@ -124,7 +122,7 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
int numBlocksMerged = 0;
while (numBlocksMerged < numOperators) {
IntermediateResultsBlock blockToMerge =
- blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
// Query times out, skip merging the remaining results blocks
LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 2b54984..ff84f39 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -67,17 +67,17 @@ public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBloc
private final List<Operator> _operators;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
- private final long _timeOutMs;
+ private final long _endTimeMs;
// Limit on number of groups stored, beyond which no new group will be created
private final int _innerSegmentNumGroupsLimit;
private final int _interSegmentNumGroupsLimit;
public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
- long timeOutMs, int innerSegmentNumGroupsLimit) {
+ long endTimeMs, int innerSegmentNumGroupsLimit) {
_operators = operators;
_queryContext = queryContext;
_executorService = executorService;
- _timeOutMs = timeOutMs;
+ _endTimeMs = endTimeMs;
_innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
_interSegmentNumGroupsLimit =
(int) Math.min((long) innerSegmentNumGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR, Integer.MAX_VALUE);
@@ -189,11 +189,12 @@ public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBloc
}
try {
- boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS);
+ long timeoutMs = _endTimeMs - System.currentTimeMillis();
+ boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!opCompleted) {
// If this happens, the broker side should already timed out, just log the error and return
String errorMessage = String
- .format("Timed out while combining group-by results after %dms, queryContext = %s", _timeOutMs,
+ .format("Timed out while combining group-by results after %dms, queryContext = %s", timeoutMs,
_queryContext);
LOGGER.error(errorMessage);
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 8fb174c..37af764 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -66,18 +66,18 @@ public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResu
private final List<Operator> _operators;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
- private final long _timeOutMs;
+ private final long _endTimeMs;
private final int _indexedTableCapacity;
private final Lock _initLock;
private DataSchema _dataSchema;
private ConcurrentIndexedTable _indexedTable;
public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
- ExecutorService executorService, long timeOutMs) {
+ ExecutorService executorService, long endTimeMs) {
_operators = operators;
_queryContext = queryContext;
_executorService = executorService;
- _timeOutMs = timeOutMs;
+ _endTimeMs = endTimeMs;
_initLock = new ReentrantLock();
_indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext);
}
@@ -220,11 +220,12 @@ public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResu
}
try {
- boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS);
+ long timeoutMs = _endTimeMs - System.currentTimeMillis();
+ boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!opCompleted) {
// If this happens, the broker side should already timed out, just log the error and return
String errorMessage = String
- .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", _timeOutMs,
+ .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
_queryContext);
LOGGER.error(errorMessage);
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
index 07aad33..2da4c56 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
@@ -42,8 +42,8 @@ public class SelectionOnlyCombineOperator extends BaseCombineOperator {
private final int _numRowsToKeep;
public SelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
- ExecutorService executorService, long timeOutMs) {
- super(operators, queryContext, executorService, timeOutMs);
+ ExecutorService executorService, long endTimeMs) {
+ super(operators, queryContext, executorService, endTimeMs);
_numRowsToKeep = queryContext.getLimit();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index af5b18e..f95908f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -69,8 +69,8 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
private final int _numRowsToKeep;
public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
- ExecutorService executorService, long timeOutMs) {
- super(operators, queryContext, executorService, timeOutMs);
+ ExecutorService executorService, long endTimeMs) {
+ super(operators, queryContext, executorService, endTimeMs);
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
}
@@ -91,9 +91,6 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
}
private IntermediateResultsBlock minMaxValueBasedCombine() {
- long startTimeMs = System.currentTimeMillis();
- long endTimeMs = startTimeMs + _timeOutMs;
-
List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
assert orderByExpressions != null;
int numOrderByExpressions = orderByExpressions.size();
@@ -270,7 +267,7 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
int numBlocksMerged = 0;
while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) {
IntermediateResultsBlock blockToMerge =
- blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
// Query times out, skip merging the remaining results blocks
LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 4f01fea..b6d9155 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -51,12 +51,10 @@ public class CombinePlanNode implements PlanNode {
// Try to schedule 10 plans for each thread, or evenly distribute plans to all MAX_NUM_THREADS_PER_QUERY threads
private static final int TARGET_NUM_PLANS_PER_THREAD = 10;
- private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;
-
private final List<PlanNode> _planNodes;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
- private final long _timeOutMs;
+ private final long _endTimeMs;
private final int _numGroupsLimit;
/**
@@ -65,15 +63,15 @@ public class CombinePlanNode implements PlanNode {
* @param planNodes List of underlying plan nodes
* @param queryContext Query context
* @param executorService Executor service
- * @param timeOutMs Time out in milliseconds for query execution (not for planning phase)
+ * @param endTimeMs End time in milliseconds for the query
* @param numGroupsLimit Limit of number of groups stored in each segment
*/
public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, ExecutorService executorService,
- long timeOutMs, int numGroupsLimit) {
+ long endTimeMs, int numGroupsLimit) {
_planNodes = planNodes;
_queryContext = queryContext;
_executorService = executorService;
- _timeOutMs = timeOutMs;
+ _endTimeMs = endTimeMs;
_numGroupsLimit = numGroupsLimit;
}
@@ -91,9 +89,6 @@ public class CombinePlanNode implements PlanNode {
} else {
// Large number of plan nodes, run them in parallel
- // Calculate the time out timestamp
- long endTimeMs = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;
-
int numThreads = Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / TARGET_NUM_PLANS_PER_THREAD,
MAX_NUM_THREADS_PER_QUERY);
@@ -136,7 +131,7 @@ public class CombinePlanNode implements PlanNode {
try {
for (Future future : futures) {
List<Operator> ops =
- (List<Operator>) future.get(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ (List<Operator>) future.get(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
operators.addAll(ops);
}
} catch (Exception e) {
@@ -163,22 +158,22 @@ public class CombinePlanNode implements PlanNode {
if (QueryContextUtils.isAggregationQuery(_queryContext)) {
if (_queryContext.getGroupByExpressions() == null) {
// Aggregation only
- return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
} else {
// Aggregation group-by
QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions());
if (queryOptions.isGroupByModeSQL()) {
- return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
}
- return new GroupByCombineOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit);
+ return new GroupByCombineOperator(operators, _queryContext, _executorService, _endTimeMs, _numGroupsLimit);
}
} else {
if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) {
// Selection only
- return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
} else {
// Selection order-by
- return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+ return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index caae551..746b08a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -97,13 +97,13 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
@Override
public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
- ExecutorService executorService, long timeOutMs) {
+ ExecutorService executorService, long endTimeMs) {
List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
for (IndexSegment indexSegment : indexSegments) {
planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
}
CombinePlanNode combinePlanNode =
- new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit);
+ new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit);
return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
index 553a6d8..b4a316d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
@@ -37,7 +37,7 @@ public interface PlanMaker {
* Returns an instance level {@link Plan} which contains the logical execution plan for multiple segments.
*/
Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, ExecutorService executorService,
- long timeoutMs);
+ long endTimeMs);
/**
* Returns a segment level {@link PlanNode} which contains the logical execution plan for one segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 46d2579..df29292 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -100,7 +100,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
if (schedulerWaitTimer != null) {
schedulerWaitTimer.stopAndRecord();
}
- long querySchedulingTimeMs = System.currentTimeMillis() - timerContext.getQueryArrivalTimeMs();
+ long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
+ long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs;
TimerContext.Timer queryProcessingTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);
long requestId = queryRequest.getRequestId();
@@ -116,10 +117,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
queryTimeoutMs = timeoutFromQueryOptions;
}
}
- long remainingTimeMs = queryTimeoutMs - querySchedulingTimeMs;
// Query scheduler wait time already exceeds query timeout, directly return
- if (remainingTimeMs <= 0) {
+ if (querySchedulingTimeMs >= queryTimeoutMs) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
String errorMessage = String
.format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs,
@@ -213,8 +213,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
indexSegments.add(segmentDataManager.getSegment());
}
- Plan globalQueryPlan =
- _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, remainingTimeMs);
+ long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
+ Plan globalQueryPlan = _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
planBuildTimer.stopAndRecord();
TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index ac4ecf5..5ee3da2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -227,8 +228,9 @@ public class SelectionCombineOperatorTest {
for (IndexSegment indexSegment : _indexSegments) {
planNodes.add(PLAN_MAKER.makeSegmentPlanNode(indexSegment, queryContext));
}
- CombinePlanNode combinePlanNode =
- new CombinePlanNode(planNodes, queryContext, EXECUTOR, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, EXECUTOR,
+ System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
return combinePlanNode.run().nextBlock();
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
index ed8fb6e..b0d5d13 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
+import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -55,7 +56,8 @@ public class CombinePlanNodeTest {
return null;
});
}
- CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 1000,
+ CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService,
+ System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
combinePlanNode.run();
Assert.assertEquals(numPlans, count.get());
@@ -64,15 +66,13 @@ public class CombinePlanNodeTest {
@Test
public void testSlowPlanNode() {
- // Warning: this test is slow (take 10 seconds).
-
AtomicBoolean notInterrupted = new AtomicBoolean();
List<PlanNode> planNodes = new ArrayList<>();
for (int i = 0; i < 20; i++) {
planNodes.add(() -> {
try {
- Thread.sleep(20000);
+ Thread.sleep(10000);
} catch (InterruptedException e) {
// Thread should be interrupted
throw new RuntimeException(e);
@@ -81,8 +81,9 @@ public class CombinePlanNodeTest {
return null;
});
}
- CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ CombinePlanNode combinePlanNode =
+ new CombinePlanNode(planNodes, _queryContext, _executorService, System.currentTimeMillis() + 100,
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
try {
combinePlanNode.run();
} catch (RuntimeException e) {
@@ -102,7 +103,8 @@ public class CombinePlanNodeTest {
throw new RuntimeException("Inner exception message.");
});
}
- CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0,
+ CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService,
+ System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
try {
combinePlanNode.run();
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 619302c..86c144d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -198,8 +198,8 @@ public abstract class BaseQueriesTest {
*/
private BrokerResponseNative getBrokerResponse(QueryContext queryContext, PlanMaker planMaker) {
// Server side.
- Plan plan = planMaker
- .makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE, Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+ Plan plan = planMaker.makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE,
+ System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
DataTable instanceResponse = plan.execute();
// Broker side.
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 19afd73..f419cfd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -35,7 +35,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.broker.SelectionResults;
import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
@@ -305,7 +305,6 @@ public class DistinctQueriesTest extends BaseQueriesTest {
}
}
-
/**
* Test DISTINCT query within a single segment.
* <p>The following query types are tested:
@@ -759,6 +758,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
};
testDistinctInterSegmentHelper(pqlQueries, sqlQueries);
}
+
/**
* Helper method to query 2 servers with different segments. Server0 will have 2 copies of segment0; Server1 will have
* 2 copies of segment1.
@@ -768,10 +768,10 @@ public class DistinctQueriesTest extends BaseQueriesTest {
// Server side
DataTable instanceResponse0 = PLAN_MAKER
.makeInstancePlan(Arrays.asList(segment0, segment0), queryContext, EXECUTOR_SERVICE,
- CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+ System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
DataTable instanceResponse1 = PLAN_MAKER
.makeInstancePlan(Arrays.asList(segment1, segment1), queryContext, EXECUTOR_SERVICE,
- CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+ System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
// Broker side
BrokerReduceService brokerReduceService = new BrokerReduceService();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org