You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/08 21:32:27 UTC

[incubator-pinot] branch master updated: Use query timeout for planning phase (#5990)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cd35332  Use query timeout for planning phase (#5990)
cd35332 is described below

commit cd35332a51c0c4997e8fdffca0dad9d090d0cf66
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 8 14:32:08 2020 -0700

    Use query timeout for planning phase (#5990)
    
    Currently the `CombinePlanNode` is using fixed 10 seconds as the timeout for the multi-threaded query planning, and the query planning time is not counted into the timeout for the query.
    This PR:
    - Replaces the fixed 10 seconds timeout with the query timeout for the query planing phase
    - Count the query planing time into the query timeout
---
 .../combine/AggregationOnlyCombineOperator.java    |  4 ++--
 .../core/operator/combine/BaseCombineOperator.java | 10 ++++-----
 .../operator/combine/GroupByCombineOperator.java   | 11 +++++-----
 .../combine/GroupByOrderByCombineOperator.java     | 11 +++++-----
 .../combine/SelectionOnlyCombineOperator.java      |  4 ++--
 .../combine/SelectionOrderByCombineOperator.java   |  9 +++-----
 .../apache/pinot/core/plan/CombinePlanNode.java    | 25 +++++++++-------------
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  4 ++--
 .../apache/pinot/core/plan/maker/PlanMaker.java    |  2 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  | 10 ++++-----
 .../combine/SelectionCombineOperatorTest.java      |  6 ++++--
 .../pinot/core/plan/CombinePlanNodeTest.java       | 16 ++++++++------
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  4 ++--
 .../apache/pinot/queries/DistinctQueriesTest.java  |  8 +++----
 14 files changed, 60 insertions(+), 64 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
index 83a474e..c3ec21b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java
@@ -34,8 +34,8 @@ public class AggregationOnlyCombineOperator extends BaseCombineOperator {
   private static final String OPERATOR_NAME = "AggregationOnlyCombineOperator";
 
   public AggregationOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
-      ExecutorService executorService, long timeOutMs) {
-    super(operators, queryContext, executorService, timeOutMs);
+      ExecutorService executorService, long endTimeMs) {
+    super(operators, queryContext, executorService, endTimeMs);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 2d10a35..7e60844 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -50,20 +50,18 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
   protected final List<Operator> _operators;
   protected final QueryContext _queryContext;
   protected final ExecutorService _executorService;
-  protected final long _timeOutMs;
+  protected final long _endTimeMs;
 
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
-      long timeOutMs) {
+      long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
-    _timeOutMs = timeOutMs;
+    _endTimeMs = endTimeMs;
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
-    long startTimeMs = System.currentTimeMillis();
-    long endTimeMs = startTimeMs + _timeOutMs;
     int numOperators = _operators.size();
     int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
 
@@ -124,7 +122,7 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
       int numBlocksMerged = 0;
       while (numBlocksMerged < numOperators) {
         IntermediateResultsBlock blockToMerge =
-            blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+            blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
         if (blockToMerge == null) {
           // Query times out, skip merging the remaining results blocks
           LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 2b54984..ff84f39 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -67,17 +67,17 @@ public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBloc
   private final List<Operator> _operators;
   private final QueryContext _queryContext;
   private final ExecutorService _executorService;
-  private final long _timeOutMs;
+  private final long _endTimeMs;
   // Limit on number of groups stored, beyond which no new group will be created
   private final int _innerSegmentNumGroupsLimit;
   private final int _interSegmentNumGroupsLimit;
 
   public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
-      long timeOutMs, int innerSegmentNumGroupsLimit) {
+      long endTimeMs, int innerSegmentNumGroupsLimit) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
-    _timeOutMs = timeOutMs;
+    _endTimeMs = endTimeMs;
     _innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
     _interSegmentNumGroupsLimit =
         (int) Math.min((long) innerSegmentNumGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR, Integer.MAX_VALUE);
@@ -189,11 +189,12 @@ public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBloc
     }
 
     try {
-      boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS);
+      long timeoutMs = _endTimeMs - System.currentTimeMillis();
+      boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
       if (!opCompleted) {
         // If this happens, the broker side should already timed out, just log the error and return
         String errorMessage = String
-            .format("Timed out while combining group-by results after %dms, queryContext = %s", _timeOutMs,
+            .format("Timed out while combining group-by results after %dms, queryContext = %s", timeoutMs,
                 _queryContext);
         LOGGER.error(errorMessage);
         return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 8fb174c..37af764 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -66,18 +66,18 @@ public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResu
   private final List<Operator> _operators;
   private final QueryContext _queryContext;
   private final ExecutorService _executorService;
-  private final long _timeOutMs;
+  private final long _endTimeMs;
   private final int _indexedTableCapacity;
   private final Lock _initLock;
   private DataSchema _dataSchema;
   private ConcurrentIndexedTable _indexedTable;
 
   public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
-      ExecutorService executorService, long timeOutMs) {
+      ExecutorService executorService, long endTimeMs) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
-    _timeOutMs = timeOutMs;
+    _endTimeMs = endTimeMs;
     _initLock = new ReentrantLock();
     _indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext);
   }
@@ -220,11 +220,12 @@ public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResu
     }
 
     try {
-      boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS);
+      long timeoutMs = _endTimeMs - System.currentTimeMillis();
+      boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
       if (!opCompleted) {
         // If this happens, the broker side should already timed out, just log the error and return
         String errorMessage = String
-            .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", _timeOutMs,
+            .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
                 _queryContext);
         LOGGER.error(errorMessage);
         return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
index 07aad33..2da4c56 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
@@ -42,8 +42,8 @@ public class SelectionOnlyCombineOperator extends BaseCombineOperator {
   private final int _numRowsToKeep;
 
   public SelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
-      ExecutorService executorService, long timeOutMs) {
-    super(operators, queryContext, executorService, timeOutMs);
+      ExecutorService executorService, long endTimeMs) {
+    super(operators, queryContext, executorService, endTimeMs);
     _numRowsToKeep = queryContext.getLimit();
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index af5b18e..f95908f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -69,8 +69,8 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
   private final int _numRowsToKeep;
 
   public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
-      ExecutorService executorService, long timeOutMs) {
-    super(operators, queryContext, executorService, timeOutMs);
+      ExecutorService executorService, long endTimeMs) {
+    super(operators, queryContext, executorService, endTimeMs);
     _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
   }
 
@@ -91,9 +91,6 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
   }
 
   private IntermediateResultsBlock minMaxValueBasedCombine() {
-    long startTimeMs = System.currentTimeMillis();
-    long endTimeMs = startTimeMs + _timeOutMs;
-
     List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
     assert orderByExpressions != null;
     int numOrderByExpressions = orderByExpressions.size();
@@ -270,7 +267,7 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
       int numBlocksMerged = 0;
       while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) {
         IntermediateResultsBlock blockToMerge =
-            blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+            blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
         if (blockToMerge == null) {
           // Query times out, skip merging the remaining results blocks
           LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 4f01fea..b6d9155 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -51,12 +51,10 @@ public class CombinePlanNode implements PlanNode {
   // Try to schedule 10 plans for each thread, or evenly distribute plans to all MAX_NUM_THREADS_PER_QUERY threads
   private static final int TARGET_NUM_PLANS_PER_THREAD = 10;
 
-  private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;
-
   private final List<PlanNode> _planNodes;
   private final QueryContext _queryContext;
   private final ExecutorService _executorService;
-  private final long _timeOutMs;
+  private final long _endTimeMs;
   private final int _numGroupsLimit;
 
   /**
@@ -65,15 +63,15 @@ public class CombinePlanNode implements PlanNode {
    * @param planNodes List of underlying plan nodes
    * @param queryContext Query context
    * @param executorService Executor service
-   * @param timeOutMs Time out in milliseconds for query execution (not for planning phase)
+   * @param endTimeMs End time in milliseconds for the query
    * @param numGroupsLimit Limit of number of groups stored in each segment
    */
   public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, ExecutorService executorService,
-      long timeOutMs, int numGroupsLimit) {
+      long endTimeMs, int numGroupsLimit) {
     _planNodes = planNodes;
     _queryContext = queryContext;
     _executorService = executorService;
-    _timeOutMs = timeOutMs;
+    _endTimeMs = endTimeMs;
     _numGroupsLimit = numGroupsLimit;
   }
 
@@ -91,9 +89,6 @@ public class CombinePlanNode implements PlanNode {
     } else {
       // Large number of plan nodes, run them in parallel
 
-      // Calculate the time out timestamp
-      long endTimeMs = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;
-
       int numThreads = Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / TARGET_NUM_PLANS_PER_THREAD,
           MAX_NUM_THREADS_PER_QUERY);
 
@@ -136,7 +131,7 @@ public class CombinePlanNode implements PlanNode {
       try {
         for (Future future : futures) {
           List<Operator> ops =
-              (List<Operator>) future.get(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+              (List<Operator>) future.get(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
           operators.addAll(ops);
         }
       } catch (Exception e) {
@@ -163,22 +158,22 @@ public class CombinePlanNode implements PlanNode {
     if (QueryContextUtils.isAggregationQuery(_queryContext)) {
       if (_queryContext.getGroupByExpressions() == null) {
         // Aggregation only
-        return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+        return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
       } else {
         // Aggregation group-by
         QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions());
         if (queryOptions.isGroupByModeSQL()) {
-          return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+          return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
         }
-        return new GroupByCombineOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit);
+        return new GroupByCombineOperator(operators, _queryContext, _executorService, _endTimeMs, _numGroupsLimit);
       }
     } else {
       if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) {
         // Selection only
-        return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+        return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
       } else {
         // Selection order-by
-        return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
+        return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index caae551..746b08a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -97,13 +97,13 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
 
   @Override
   public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
-      ExecutorService executorService, long timeOutMs) {
+      ExecutorService executorService, long endTimeMs) {
     List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
     for (IndexSegment indexSegment : indexSegments) {
       planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
     }
     CombinePlanNode combinePlanNode =
-        new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit);
+        new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit);
     return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
index 553a6d8..b4a316d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
@@ -37,7 +37,7 @@ public interface PlanMaker {
    * Returns an instance level {@link Plan} which contains the logical execution plan for multiple segments.
    */
   Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, ExecutorService executorService,
-      long timeoutMs);
+      long endTimeMs);
 
   /**
    * Returns a segment level {@link PlanNode} which contains the logical execution plan for one segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 46d2579..df29292 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -100,7 +100,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     if (schedulerWaitTimer != null) {
       schedulerWaitTimer.stopAndRecord();
     }
-    long querySchedulingTimeMs = System.currentTimeMillis() - timerContext.getQueryArrivalTimeMs();
+    long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
+    long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs;
     TimerContext.Timer queryProcessingTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);
 
     long requestId = queryRequest.getRequestId();
@@ -116,10 +117,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
         queryTimeoutMs = timeoutFromQueryOptions;
       }
     }
-    long remainingTimeMs = queryTimeoutMs - querySchedulingTimeMs;
 
     // Query scheduler wait time already exceeds query timeout, directly return
-    if (remainingTimeMs <= 0) {
+    if (querySchedulingTimeMs >= queryTimeoutMs) {
       _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
       String errorMessage = String
           .format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs,
@@ -213,8 +213,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
         for (SegmentDataManager segmentDataManager : segmentDataManagers) {
           indexSegments.add(segmentDataManager.getSegment());
         }
-        Plan globalQueryPlan =
-            _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, remainingTimeMs);
+        long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
+        Plan globalQueryPlan = _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
         planBuildTimer.stopAndRecord();
 
         TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index ac4ecf5..5ee3da2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -227,8 +228,9 @@ public class SelectionCombineOperatorTest {
     for (IndexSegment indexSegment : _indexSegments) {
       planNodes.add(PLAN_MAKER.makeSegmentPlanNode(indexSegment, queryContext));
     }
-    CombinePlanNode combinePlanNode =
-        new CombinePlanNode(planNodes, queryContext, EXECUTOR, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, EXECUTOR,
+        System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
+        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
     return combinePlanNode.run().nextBlock();
   }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
index ed8fb6e..b0d5d13 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.Assert;
+import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -55,7 +56,8 @@ public class CombinePlanNodeTest {
           return null;
         });
       }
-      CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 1000,
+      CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService,
+          System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
           InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
       combinePlanNode.run();
       Assert.assertEquals(numPlans, count.get());
@@ -64,15 +66,13 @@ public class CombinePlanNodeTest {
 
   @Test
   public void testSlowPlanNode() {
-    // Warning: this test is slow (take 10 seconds).
-
     AtomicBoolean notInterrupted = new AtomicBoolean();
 
     List<PlanNode> planNodes = new ArrayList<>();
     for (int i = 0; i < 20; i++) {
       planNodes.add(() -> {
         try {
-          Thread.sleep(20000);
+          Thread.sleep(10000);
         } catch (InterruptedException e) {
           // Thread should be interrupted
           throw new RuntimeException(e);
@@ -81,8 +81,9 @@ public class CombinePlanNodeTest {
         return null;
       });
     }
-    CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0,
-        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, _queryContext, _executorService, System.currentTimeMillis() + 100,
+            InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
     try {
       combinePlanNode.run();
     } catch (RuntimeException e) {
@@ -102,7 +103,8 @@ public class CombinePlanNodeTest {
         throw new RuntimeException("Inner exception message.");
       });
     }
-    CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0,
+    CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService,
+        System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
         InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
     try {
       combinePlanNode.run();
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 619302c..86c144d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -198,8 +198,8 @@ public abstract class BaseQueriesTest {
    */
   private BrokerResponseNative getBrokerResponse(QueryContext queryContext, PlanMaker planMaker) {
     // Server side.
-    Plan plan = planMaker
-        .makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE, Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+    Plan plan = planMaker.makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE,
+        System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
     DataTable instanceResponse = plan.execute();
 
     // Broker side.
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 19afd73..f419cfd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -35,7 +35,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
@@ -305,7 +305,6 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     }
   }
 
-
   /**
    * Test DISTINCT query within a single segment.
    * <p>The following query types are tested:
@@ -759,6 +758,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     };
     testDistinctInterSegmentHelper(pqlQueries, sqlQueries);
   }
+
   /**
    * Helper method to query 2 servers with different segments. Server0 will have 2 copies of segment0; Server1 will have
    * 2 copies of segment1.
@@ -768,10 +768,10 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     // Server side
     DataTable instanceResponse0 = PLAN_MAKER
         .makeInstancePlan(Arrays.asList(segment0, segment0), queryContext, EXECUTOR_SERVICE,
-            CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+            System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
     DataTable instanceResponse1 = PLAN_MAKER
         .makeInstancePlan(Arrays.asList(segment1, segment1), queryContext, EXECUTOR_SERVICE,
-            CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+            System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
 
     // Broker side
     BrokerReduceService brokerReduceService = new BrokerReduceService();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org