You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/13 02:29:29 UTC
[pinot] branch master updated: Terminate the query after plan generation if timeout (#9386)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6a5fd21cf9 Terminate the query after plan generation if timeout (#9386)
6a5fd21cf9 is described below
commit 6a5fd21cf9253b7ff67e20f94a33abc978f61708
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Mon Sep 12 19:29:21 2022 -0700
Terminate the query after plan generation if timeout (#9386)
* Terminate the query after plan generation if timeout
* Use TimeoutException
* Use TimeoutException
* Update error message
---
.../apache/pinot/core/plan/GlobalPlanImplV0.java | 7 +++++-
.../main/java/org/apache/pinot/core/plan/Plan.java | 4 +++-
.../org/apache/pinot/queries/BaseQueriesTest.java | 28 +++++++++++++++++-----
3 files changed, 31 insertions(+), 8 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
index d0f06c18f9..832b9beede 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.plan;
+import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.operator.InstanceResponseOperator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
@@ -45,11 +46,15 @@ public class GlobalPlanImplV0 implements Plan {
}
@Override
- public DataTable execute() {
+ public DataTable execute()
+ throws TimeoutException {
long startTime = System.currentTimeMillis();
InstanceResponseOperator instanceResponseOperator = _instanceResponsePlanNode.run();
long endTime1 = System.currentTimeMillis();
LOGGER.debug("InstanceResponsePlanNode.run() took: {}ms", endTime1 - startTime);
+ if (endTime1 > _instanceResponsePlanNode._queryContext.getEndTimeMs()) {
+ throw new TimeoutException("Query timed out while generating physical execution plan");
+ }
InstanceResponseBlock instanceResponseBlock = instanceResponseOperator.nextBlock();
long endTime2 = System.currentTimeMillis();
LOGGER.debug("InstanceResponseOperator.nextBlock() took: {}ms", endTime2 - endTime1);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
index 6ea877064d..00ffe0674e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.plan;
+import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
@@ -32,5 +33,6 @@ public interface Plan {
PlanNode getPlanNode();
/** Execute the query plan and get the instance response. */
- DataTable execute();
+ DataTable execute()
+ throws TimeoutException;
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 2ce3d903d7..e73069eb69 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
@@ -196,8 +197,13 @@ public abstract class BaseQueriesTest {
// Server side
serverQueryContext.setEndTimeMs(System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
Plan plan = planMaker.makeInstancePlan(getIndexSegments(), serverQueryContext, EXECUTOR_SERVICE, null);
- DataTable instanceResponse =
- queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute();
+ DataTable instanceResponse;
+ try {
+ instanceResponse =
+ queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute();
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
// Broker side
// Use 2 Threads for 2 data-tables
@@ -264,10 +270,20 @@ public abstract class BaseQueriesTest {
Plan plan1 = planMaker.makeInstancePlan(instances.get(0), serverQueryContext, EXECUTOR_SERVICE, null);
Plan plan2 = planMaker.makeInstancePlan(instances.get(1), serverQueryContext, EXECUTOR_SERVICE, null);
- DataTable instanceResponse1 =
- queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan1) : plan1.execute();
- DataTable instanceResponse2 =
- queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan2) : plan2.execute();
+ DataTable instanceResponse1;
+ try {
+ instanceResponse1 =
+ queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan1) : plan1.execute();
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ DataTable instanceResponse2;
+ try {
+ instanceResponse2 =
+ queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan2) : plan2.execute();
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
// Broker side
// Use 2 Threads for 2 data-tables
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org