You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/07/24 23:31:44 UTC

[pinot] branch master updated: [multistage] support physical plan in Explain queries (#11052)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4135e07220 [multistage] support physical plan in Explain queries (#11052)
4135e07220 is described below

commit 4135e072201a5c15dbe972fb12bae8ce3b349052
Author: Abhishek Sharma <ab...@spothero.com>
AuthorDate: Mon Jul 24 19:31:39 2023 -0400

    [multistage] support physical plan in Explain queries (#11052)
    
    * separated physical and logical plan explain.
    * added SqlPhysicalExplain node and syntax support changes.
    * Renamed PhysicalExplainPlanVisitor class and other minor PR comments fix.
---
 .../MultiStageBrokerRequestHandler.java            |  2 +-
 pinot-common/src/main/codegen/config.fmpp          |  1 +
 .../src/main/codegen/includes/parserImpls.ftl      | 31 ++++++++
 .../sql/parsers/parser/SqlPhysicalExplain.java     | 36 ++++++++++
 .../org/apache/pinot/query/QueryEnvironment.java   | 43 ++++++-----
 ...isitor.java => PhysicalExplainPlanVisitor.java} | 11 ++-
 .../query/planner/plannode/PlanNodeVisitor.java    |  4 +-
 .../apache/pinot/query/QueryCompilationTest.java   | 84 +++++++++++++++++++---
 .../pinot/query/QueryEnvironmentTestBase.java      |  3 +
 .../query/queries/ResourceBasedQueryPlansTest.java |  6 +-
 .../query/service/dispatch/QueryDispatcher.java    |  5 +-
 11 files changed, 185 insertions(+), 41 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 2f10ed7e37..27e48e2ed7 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -165,7 +165,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
       compilationStartTimeNs = System.nanoTime();
       switch (sqlNodeAndOptions.getSqlNode().getKind()) {
         case EXPLAIN:
-          queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions);
+          queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
           String plan = queryPlanResult.getExplainPlan();
           Set<String> tableNames = queryPlanResult.getTableNames();
           if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) {
diff --git a/pinot-common/src/main/codegen/config.fmpp b/pinot-common/src/main/codegen/config.fmpp
index e6955766bc..178029a3b8 100644
--- a/pinot-common/src/main/codegen/config.fmpp
+++ b/pinot-common/src/main/codegen/config.fmpp
@@ -526,6 +526,7 @@ data: {
     # List of extended statement syntax to add
     statementParserMethods: [
       "SqlInsertFromFile()"
+      "SqlPhysicalExplain()"
     ]
 
     # List of custom function syntax to add
diff --git a/pinot-common/src/main/codegen/includes/parserImpls.ftl b/pinot-common/src/main/codegen/includes/parserImpls.ftl
index 449d8ab3b9..79c1a30f44 100644
--- a/pinot-common/src/main/codegen/includes/parserImpls.ftl
+++ b/pinot-common/src/main/codegen/includes/parserImpls.ftl
@@ -119,3 +119,34 @@ void SqlAtTimeZone(List<Object> list, ExprContext exprContext, Span s) :
         list.addAll(list2);
     }
 }
+
+SqlNode SqlPhysicalExplain() :
+{
+    SqlNode stmt;
+    SqlExplainLevel detailLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES;
+    SqlExplain.Depth depth = SqlExplain.Depth.PHYSICAL;
+    final SqlExplainFormat format;
+}
+{
+    <EXPLAIN> <IMPLEMENTATION> <PLAN>
+    [ detailLevel = ExplainDetailLevel() ]
+    (
+        LOOKAHEAD(2)
+        <AS> <XML> { format = SqlExplainFormat.XML; }
+    |
+        LOOKAHEAD(2)
+        <AS> <JSON> { format = SqlExplainFormat.JSON; }
+    |
+        <AS> <DOT_FORMAT> { format = SqlExplainFormat.DOT; }
+    |
+        { format = SqlExplainFormat.TEXT; }
+    )
+    <FOR> stmt = SqlQueryOrDml() {
+        return new SqlPhysicalExplain(getPos(),
+            stmt,
+            detailLevel.symbol(SqlParserPos.ZERO),
+            depth.symbol(SqlParserPos.ZERO),
+            format.symbol(SqlParserPos.ZERO),
+            nDynamicParams);
+    }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/parser/SqlPhysicalExplain.java b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/parser/SqlPhysicalExplain.java
new file mode 100644
index 0000000000..2062a5af67
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/parser/SqlPhysicalExplain.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.parser;
+
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Calcite extension for creating a physical plan sql node from a EXPLAIN IMPLEMENTATION query.
+ *
+ * <p>Syntax: EXPLAIN IMPLEMENTATION PLAN [ [INCLUDING | EXCLUDING] [ALL] ATTRIBUTES ] FOR SELECT</p>
+ */
+public class SqlPhysicalExplain extends SqlExplain {
+  public SqlPhysicalExplain(SqlParserPos pos, SqlNode explicandum, SqlLiteral detailLevel, SqlLiteral depth,
+      SqlLiteral format, int dynamicParameterCount) {
+    super(pos, explicandum, detailLevel, depth, format, dynamicParameterCount);
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 0201ef682c..5963ba5f98 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -61,6 +61,7 @@ import org.apache.calcite.tools.RelBuilder;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.DispatchableSubPlan;
+import org.apache.pinot.query.planner.PhysicalExplainPlanVisitor;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.SubPlan;
@@ -71,6 +72,7 @@ import org.apache.pinot.query.routing.WorkerManager;
 import org.apache.pinot.query.type.TypeFactory;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain;
 
 
 /**
@@ -169,11 +171,10 @@ public class QueryEnvironment {
     try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
-      SubPlan subPlanRoot = toSubPlan(relRoot);
       // TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query.
       // Each SubPlan should be able to run independently from Broker then set the results into the dependent
       // SubPlan for further processing.
-      DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(subPlanRoot, plannerContext, requestId);
+      DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(relRoot, plannerContext, requestId);
       return new QueryPlannerResult(dispatchableSubPlan, null, dispatchableSubPlan.getTableNames());
     } catch (CalciteContextException e) {
       throw new RuntimeException("Error composing query plan for '" + sqlQuery
@@ -195,16 +196,24 @@ public class QueryEnvironment {
    * @param sqlNodeAndOptions parsed SQL query.
    * @return QueryPlannerResult containing the explained query plan and the relRoot.
    */
-  public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions) {
+  public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
     try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
       SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
-      SqlExplainFormat format = explain.getFormat() == null ? SqlExplainFormat.DOT : explain.getFormat();
-      SqlExplainLevel level =
-          explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel();
-      Set<String> tableNames = RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel);
-      return new QueryPlannerResult(null, PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames);
+      if (explain instanceof SqlPhysicalExplain) {
+        // get the physical plan for query.
+        DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(relRoot, plannerContext, requestId);
+        return new QueryPlannerResult(null, PhysicalExplainPlanVisitor.explain(dispatchableSubPlan),
+            dispatchableSubPlan.getTableNames());
+      } else {
+        // get the logical plan for query.
+        SqlExplainFormat format = explain.getFormat() == null ? SqlExplainFormat.DOT : explain.getFormat();
+        SqlExplainLevel level =
+            explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel();
+        Set<String> tableNames = RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel);
+        return new QueryPlannerResult(null, PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames);
+      }
     } catch (Exception e) {
       throw new RuntimeException("Error explain query plan for: " + sqlQuery, e);
     }
@@ -216,15 +225,15 @@ public class QueryEnvironment {
   }
 
   @VisibleForTesting
-  public String explainQuery(String sqlQuery) {
-    return explainQuery(sqlQuery, CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery)).getExplainPlan();
+  public String explainQuery(String sqlQuery, long requestId) {
+    return explainQuery(sqlQuery, CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery), requestId).getExplainPlan();
   }
 
   public List<String> getTableNamesForQuery(String sqlQuery) {
     try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
       SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode();
       if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
-          sqlNode = ((SqlExplain) sqlNode).getExplicandum();
+        sqlNode = ((SqlExplain) sqlNode).getExplicandum();
       }
       RelRoot relRoot = compileQuery(sqlNode, plannerContext);
       Set<String> tableNames = RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel);
@@ -334,16 +343,16 @@ public class QueryEnvironment {
     // 5. construct a logical query plan.
     PinotLogicalQueryPlanner pinotLogicalQueryPlanner = new PinotLogicalQueryPlanner();
     QueryPlan queryPlan = pinotLogicalQueryPlanner.planQuery(relRoot);
-    SubPlan subPlan = pinotLogicalQueryPlanner.makePlan(queryPlan);
-    return subPlan;
+    return pinotLogicalQueryPlanner.makePlan(queryPlan);
   }
+  private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext,
+      long requestId) {
+    SubPlan subPlanRoot = toSubPlan(relRoot);
 
-  private DispatchableSubPlan toDispatchableSubPlan(SubPlan subPlan, PlannerContext plannerContext, long requestId) {
-    // 6. construct a dispatchable query plan.
     PinotDispatchPlanner pinotDispatchPlanner =
         new PinotDispatchPlanner(plannerContext, _workerManager, requestId, _tableCache);
-    DispatchableSubPlan dispatchableSubPlan = pinotDispatchPlanner.createDispatchableSubPlan(subPlan);
-    return dispatchableSubPlan;
+    pinotDispatchPlanner.createDispatchableSubPlan(subPlanRoot);
+    return pinotDispatchPlanner.createDispatchableSubPlan(subPlanRoot);
   }
 
   // --------------------------------------------------------------------------
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
similarity index 94%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
index 60fa47bb53..b0b122b176 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
@@ -42,14 +42,13 @@ import org.apache.pinot.query.routing.QueryServerInstance;
 /**
  * A visitor that converts a {@code QueryPlan} into a human-readable string representation.
  *
- * <p>It is currently not used programmatically and cannot be accessed by the user. Instead,
- * it is intended for use in manual debugging (e.g. setting breakpoints and calling QueryPlan#explain()).
+ * <p>It is getting used for getting the physical plan of the query.</p>
  */
-public class ExplainPlanPlanVisitor implements PlanNodeVisitor<StringBuilder, ExplainPlanPlanVisitor.Context> {
+public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder, PhysicalExplainPlanVisitor.Context> {
 
   private final DispatchableSubPlan _dispatchableSubPlan;
 
-  public ExplainPlanPlanVisitor(DispatchableSubPlan dispatchableSubPlan) {
+  public PhysicalExplainPlanVisitor(DispatchableSubPlan dispatchableSubPlan) {
     _dispatchableSubPlan = dispatchableSubPlan;
   }
 
@@ -87,7 +86,7 @@ public class ExplainPlanPlanVisitor implements PlanNodeVisitor<StringBuilder, Ex
    */
   public static String explainFrom(DispatchableSubPlan dispatchableSubPlan, PlanNode node,
       QueryServerInstance rootServer) {
-    final ExplainPlanPlanVisitor visitor = new ExplainPlanPlanVisitor(dispatchableSubPlan);
+    final PhysicalExplainPlanVisitor visitor = new PhysicalExplainPlanVisitor(dispatchableSubPlan);
     return node
         .visit(visitor, new Context(rootServer, 0, "", "", new StringBuilder()))
         .toString();
@@ -199,7 +198,7 @@ public class ExplainPlanPlanVisitor implements PlanNodeVisitor<StringBuilder, Ex
             .getServerInstanceToWorkerIdMap();
     context._builder.append("->");
     String receivers = servers.entrySet().stream()
-        .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+        .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
         .map(s -> "[" + receiverStageId + "]@" + s)
         .collect(Collectors.joining(",", "{", "}"));
     return context._builder.append(receivers);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
index d13eee07b0..f2da4731e9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.query.planner.plannode;
 
-import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
+import org.apache.pinot.query.planner.PhysicalExplainPlanVisitor;
 
 
 /**
@@ -27,7 +27,7 @@ import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
  * enforced traversal order, and should be implemented by subclasses.
  *
  * <p>It is recommended that implementors use private constructors and static methods to access main
- * functionality (see {@link ExplainPlanPlanVisitor#explain(org.apache.pinot.query.planner.DispatchableSubPlan)}
+ * functionality (see {@link PhysicalExplainPlanVisitor#explain(org.apache.pinot.query.planner.DispatchableSubPlan)}
  * as an example of a usage of this pattern.
  *
  * @param <T> the return type for all visitsPlanNodeVisitor
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 05e837ee77..c10a32eafe 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.planner.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.DispatchableSubPlan;
-import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
+import org.apache.pinot.query.planner.PhysicalExplainPlanVisitor;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
@@ -47,11 +47,22 @@ import org.testng.annotations.Test;
 
 public class QueryCompilationTest extends QueryEnvironmentTestBase {
 
-  @Test(dataProvider = "testQueryPlanDataProvider")
-  public void testQueryPlanExplain(String query, String digest)
+  @Test(dataProvider = "testQueryLogicalPlanDataProvider")
+  public void testQueryPlanExplainLogical(String query, String digest)
       throws Exception {
+    testQueryPlanExplain(query, digest);
+  }
+
+  @Test(dataProvider = "testQueryPhysicalPlanDataProvider")
+  public void testQueryPlanExplainPhysical(String query, String digest)
+      throws Exception {
+    testQueryPlanExplain(query, digest);
+  }
+
+  private void testQueryPlanExplain(String query, String digest) {
     try {
-      String explainedPlan = _queryEnvironment.explainQuery(query);
+      long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+      String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
       Assert.assertEquals(explainedPlan, digest);
     } catch (RuntimeException e) {
       Assert.fail("failed to explain query: " + query, e);
@@ -123,18 +134,21 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
       if (tableName != null) {
         // table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry).collect(Collectors.toSet()),
+                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .collect(Collectors.toSet()),
             tableName.equals("a") ? ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]")
                 : ImmutableList.of("localhost@{1,1}|[0]"));
       } else if (!PlannerUtils.isRootPlanFragment(stageId)) {
         // join stage should have both servers used.
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry).collect(Collectors.toSet()),
+                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .collect(Collectors.toSet()),
             ImmutableSet.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
       } else {
         // reduce stage should have the reducer instance.
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry).collect(Collectors.toSet()),
+                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .collect(Collectors.toSet()),
             ImmutableSet.of("localhost@{3,3}|[0]"));
       }
     }
@@ -243,12 +257,14 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
       if (tableName != null) {
         // table scan stages; for tableB it should have only 1
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry).collect(Collectors.toSet()),
+                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .collect(Collectors.toSet()),
             ImmutableList.of("localhost@{1,1}|[0]"));
       } else if (!PlannerUtils.isRootPlanFragment(stageId)) {
         // join stage should have both servers used.
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry).collect(Collectors.toSet()),
+                .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .collect(Collectors.toSet()),
             ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
       }
     }
@@ -399,8 +415,8 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     };
   }
 
-  @DataProvider(name = "testQueryPlanDataProvider")
-  private Object[][] provideQueriesWithExplainedPlan() {
+  @DataProvider(name = "testQueryLogicalPlanDataProvider")
+  private Object[][] provideQueriesWithExplainedLogicalPlan() {
     //@formatter:off
     return new Object[][] {
         new Object[]{"EXPLAIN PLAN INCLUDING ALL ATTRIBUTES AS JSON FOR SELECT col1, col3 FROM a",
@@ -455,4 +471,50 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     };
     //@formatter:on
   }
+
+  @DataProvider(name = "testQueryPhysicalPlanDataProvider")
+  private Object[][] provideQueriesWithExplainedPhysicalPlan() {
+    //@formatter:off
+    return new Object[][] {
+new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES AS JSON FOR SELECT col1, col3 FROM a",
+  "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
+  + "├── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "│   └── [1]@localhost:2 PROJECT\n"
+  + "│      └── [1]@localhost:2 TABLE SCAN (a) null\n"
+  + "└── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "   └── [1]@localhost:1 PROJECT\n"
+  + "      └── [1]@localhost:1 TABLE SCAN (a) null\n"},
+new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES AS DOT FOR "
+    + "SELECT col1, COUNT(*) FROM a GROUP BY col1",
+  "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
+  + "├── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+  + "└── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "   └── [1]@localhost:1 AGGREGATE_FINAL\n"
+  + "      └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+  + "         ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
+  + "         │   └── [2]@localhost:2 AGGREGATE_LEAF\n"
+  + "         │      └── [2]@localhost:2 TABLE SCAN (a) null\n"
+  + "         └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
+  + "            └── [2]@localhost:1 AGGREGATE_LEAF\n"
+  + "               └── [2]@localhost:1 TABLE SCAN (a) null\n"},
+new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
+  "[0]@localhost:3 MAIL_RECEIVE(RANDOM_DISTRIBUTED)\n"
+  + "├── [1]@localhost:2 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+  + "└── [1]@localhost:1 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+  + "   └── [1]@localhost:1 PROJECT\n"
+  + "      └── [1]@localhost:1 JOIN\n"
+  + "         ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+  + "         │   ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
+  + "         │   │   └── [2]@localhost:2 PROJECT\n"
+  + "         │   │      └── [2]@localhost:2 TABLE SCAN (a) null\n"
+  + "         │   └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
+  + "         │      └── [2]@localhost:1 PROJECT\n"
+  + "         │         └── [2]@localhost:1 TABLE SCAN (a) null\n"
+  + "         └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+  + "            └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[0],[1]@localhost@{1,1}|[1]}\n"
+  + "               └── [3]@localhost:1 PROJECT\n"
+  + "                  └── [3]@localhost:1 TABLE SCAN (b) null\n"}
+    };
+    //@formatter:on
+  }
 }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 135e1cb208..c6308d0854 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.core.routing.RoutingManager;
@@ -40,6 +41,8 @@ import org.testng.annotations.DataProvider;
 
 
 public class QueryEnvironmentTestBase {
+
+  protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
   public static final Map<String, List<String>> SERVER1_SEGMENTS =
       ImmutableMap.of("a_REALTIME", ImmutableList.of("a1", "a2"), "b_REALTIME", ImmutableList.of("b1"), "c_OFFLINE",
           ImmutableList.of("c1"), "d_OFFLINE", ImmutableList.of("d1"));
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
index dc3c17713e..3da8ce8edf 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
@@ -47,7 +47,8 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
   @Test(dataProvider = "testResourceQueryPlannerTestCaseProviderHappyPath")
   public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, String query, String output) {
     try {
-      String explainedPlan = _queryEnvironment.explainQuery(query);
+      long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+      String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
       Assert.assertEquals(explainedPlan, output,
           String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
       String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
@@ -63,7 +64,8 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
   @Test(dataProvider = "testResourceQueryPlannerTestCaseProviderExceptions")
   public void testQueryExplainPlansWithExceptions(String testCaseName, String query, String expectedException) {
     try {
-      _queryEnvironment.explainQuery(query);
+      long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+      _queryEnvironment.explainQuery(query, requestId);
       Assert.fail("Query compilation should have failed with exception message pattern: " + expectedException);
     } catch (Exception e) {
       if (expectedException == null) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 16ca29f878..e5ca95cbbe 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -47,7 +47,7 @@ import org.apache.pinot.core.util.trace.TracedThreadFactory;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.DispatchableSubPlan;
-import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
+import org.apache.pinot.query.planner.PhysicalExplainPlanVisitor;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -100,7 +100,8 @@ public class QueryDispatcher {
           traceEnabled);
     } catch (Exception e) {
       cancel(requestId, dispatchableSubPlan);
-      throw new RuntimeException("Error executing query: " + ExplainPlanPlanVisitor.explain(dispatchableSubPlan), e);
+      throw new RuntimeException("Error executing query: "
+          + PhysicalExplainPlanVisitor.explain(dispatchableSubPlan), e);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org