You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/08 04:36:46 UTC

(pinot) branch master updated: [test][multistage] enhance query plan test (#11966)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b5e982333d [test][multistage] enhance query plan test (#11966)
b5e982333d is described below

commit b5e982333daf7ad2f59fe9c534d8413c0f33ff58
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Nov 7 20:36:39 2023 -0800

    [test][multistage] enhance query plan test (#11966)
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../query/planner/PhysicalExplainPlanVisitor.java  | 24 +++----
 .../apache/pinot/query/QueryCompilationTest.java   | 51 --------------
 .../pinot/query/QueryEnvironmentTestBase.java      | 21 +++++-
 .../query/queries/ResourceBasedQueryPlansTest.java |  7 +-
 .../resources/queries/ExplainPhysicalPlans.json    | 80 ++++++++++++++++++++++
 5 files changed, 115 insertions(+), 68 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
index 53dd9bf568..245f531522 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
@@ -156,27 +156,23 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
     MailboxSendNode sender = (MailboxSendNode) node.getSender();
     int senderStageId = node.getSenderStageId();
     DispatchablePlanFragment dispatchablePlanFragment = _dispatchableSubPlan.getQueryStageList().get(senderStageId);
-    Map<Integer, Map<String, List<String>>> segments = dispatchablePlanFragment.getWorkerIdToSegmentsMap();
 
     Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
         dispatchablePlanFragment.getServerInstanceToWorkerIdMap();
     Iterator<QueryServerInstance> iterator = serverInstanceToWorkerIdMap.keySet().iterator();
     while (iterator.hasNext()) {
       QueryServerInstance queryServerInstance = iterator.next();
-      for (int workerId : serverInstanceToWorkerIdMap.get(queryServerInstance)) {
-        if (segments.containsKey(workerId)) {
-          // always print out leaf stages
-          sender.visit(this, context.next(iterator.hasNext(), queryServerInstance, workerId));
+      List<Integer> workerIdList = serverInstanceToWorkerIdMap.get(queryServerInstance);
+      for (int idx = 0; idx < workerIdList.size(); idx++) {
+        int workerId = workerIdList.get(idx);
+        if (!iterator.hasNext() && idx == workerIdList.size() - 1) {
+          // always print out the last one
+          sender.visit(this, context.next(false, queryServerInstance, workerId));
         } else {
-          if (!iterator.hasNext()) {
-            // always print out the last one
-            sender.visit(this, context.next(false, queryServerInstance, workerId));
-          } else {
-            // only print short version of the sender node
-            appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
-                .append(" (Subtree Omitted)")
-                .append('\n');
-          }
+          // only print short version of the sender node
+          appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
+              .append(" (Subtree Omitted)")
+              .append('\n');
         }
       }
     }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index f04167e1b0..6d59f8da22 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -53,12 +53,6 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     testQueryPlanExplain(query, digest);
   }
 
-  @Test(dataProvider = "testQueryPhysicalPlanDataProvider")
-  public void testQueryPlanExplainPhysical(String query, String digest)
-      throws Exception {
-    testQueryPlanExplain(query, digest);
-  }
-
   private void testQueryPlanExplain(String query, String digest) {
     try {
       long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
@@ -466,49 +460,4 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     };
     //@formatter:on
   }
-
-  @DataProvider(name = "testQueryPhysicalPlanDataProvider")
-  private Object[][] provideQueriesWithExplainedPhysicalPlan() {
-    //@formatter:off
-    return new Object[][] {
-new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
-  "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
-+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ "│   └── [1]@localhost:1 PROJECT\n"
-+ "│       └── [1]@localhost:1 TABLE SCAN (a) null\n"
-+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ "    └── [1]@localhost:2 PROJECT\n"
-+ "        └── [1]@localhost:2 TABLE SCAN (a) null\n"},
-new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
-"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
-+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
-+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ "    └── [1]@localhost:2 AGGREGATE_FINAL\n"
-+ "        └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-+ "            ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ "            │   └── [2]@localhost:1 AGGREGATE_LEAF\n"
-+ "            │       └── [2]@localhost:1 TABLE SCAN (a) null\n"
-+ "            └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ "                └── [2]@localhost:2 AGGREGATE_LEAF\n"
-+ "                    └── [2]@localhost:2 TABLE SCAN (a) null\n"},
-new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
-  "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
-+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
-+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ "    └── [1]@localhost:2 PROJECT\n"
-+ "        └── [1]@localhost:2 JOIN\n"
-+ "            ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-+ "            │   ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ "            │   │   └── [2]@localhost:1 PROJECT\n"
-+ "            │   │       └── [2]@localhost:1 TABLE SCAN (a) null\n"
-+ "            │   └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ "            │       └── [2]@localhost:2 PROJECT\n"
-+ "            │           └── [2]@localhost:2 TABLE SCAN (a) null\n"
-+ "            └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-+ "                └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ "                    └── [3]@localhost:1 PROJECT\n"
-+ "                        └── [3]@localhost:1 TABLE SCAN (b) null\n"}
-    };
-    //@formatter:on
-  }
 }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 3dbc6833ef..958ae86f8f 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -57,6 +58,23 @@ public class QueryEnvironmentTestBase {
           "d_REALTIME", ImmutableList.of("d2"), "d_OFFLINE", ImmutableList.of("d3"), "e_REALTIME",
           ImmutableList.of("e2"), "e_OFFLINE", ImmutableList.of("e3"));
   public static final Map<String, Schema> TABLE_SCHEMAS = new HashMap<>();
+  public static final Map<String, Pair<String, List<List<String>>>> PARTITIONED_SEGMENTS_MAP = new HashMap<>();
+  public static final int PARTITION_COUNT = 4;
+  public static final Map<String, String> PARTITIONED_TABLES =
+      ImmutableMap.of("a_REALTIME", "col2", "b_REALTIME", "col1");
+  static {
+    for (Map.Entry<String, String> e : PARTITIONED_TABLES.entrySet()) {
+      String tableName = e.getKey();
+      String partitionColumn = e.getValue();
+      List<List<String>> partitionIdToSegmentsMap = new ArrayList<>(PARTITION_COUNT);
+      partitionIdToSegmentsMap.add(SERVER1_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
+      partitionIdToSegmentsMap.add(SERVER2_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
+      for (int i = 2; i < PARTITION_COUNT; i++) {
+        partitionIdToSegmentsMap.add(new ArrayList<>());
+      }
+      PARTITIONED_SEGMENTS_MAP.put(tableName, Pair.of(partitionColumn, partitionIdToSegmentsMap));
+    }
+  }
 
   static {
     TABLE_SCHEMAS.put("a_REALTIME", getSchemaBuilder("a").build());
@@ -84,7 +102,8 @@ public class QueryEnvironmentTestBase {
   @BeforeClass
   public void setUp() {
     // the port doesn't matter as we are not actually making a server call.
-    _queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, null);
+    _queryEnvironment =
+        getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, PARTITIONED_SEGMENTS_MAP);
   }
 
   @DataProvider(name = "testQueryDataProvider")
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
index c545bd2e29..562e3d23fb 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
@@ -41,6 +41,8 @@ import org.testng.annotations.Test;
 
 public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
   private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final String EXPLAIN_REGEX =
+      "EXPLAIN (IMPLEMENTATION )*PLAN (INCLUDING |EXCLUDING )*(ALL )*(ATTRIBUTES )*(AS DOT |AS JSON |AS TEXT )*FOR ";
   private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
   private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
 
@@ -51,7 +53,8 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
       String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
       Assert.assertEquals(explainedPlan, output,
           String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
-      String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
+      // use a regex to exclude the
+      String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
       DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(queryWithoutExplainPlan);
       Assert.assertNotNull(dispatchableSubPlan,
           String.format("Test case %s for query %s should not have a null QueryPlan",
@@ -66,7 +69,7 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
     try {
       long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
       _queryEnvironment.explainQuery(query, requestId);
-      String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
+      String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
       _queryEnvironment.planQuery(queryWithoutExplainPlan);
       Assert.fail("Query compilation should have failed with exception message pattern: " + expectedException);
     } catch (Exception e) {
diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
new file mode 100644
index 0000000000..9f9173a107
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -0,0 +1,80 @@
+{
+  "physical_plan_explain_formats": {
+    "queries": [
+      {
+        "description": "explain plan with attributes",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
+        "output": [
+          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+          "    └── [1]@localhost:2 PROJECT\n",
+          "        └── [1]@localhost:2 TABLE SCAN (a) null\n"
+        ]
+      },
+      {
+        "description": "explain plan without attributes",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
+        "output": [
+          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+          "    └── [1]@localhost:2 AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
+          "                └── [2]@localhost:2 AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:2 TABLE SCAN (a) null\n"
+        ]
+      },
+      {
+        "description": "explain plan with join",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
+        "output": [
+          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+          "    └── [1]@localhost:2 PROJECT\n",
+          "        └── [1]@localhost:2 JOIN\n",
+          "            ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
+          "            │   └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
+          "            │       └── [2]@localhost:2 PROJECT\n",
+          "            │           └── [2]@localhost:2 TABLE SCAN (a) null\n",
+          "            └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
+          "                    └── [3]@localhost:1 PROJECT\n",
+          "                        └── [3]@localhost:1 TABLE SCAN (b) null\n"
+        ]
+      },
+      {
+        "description": "explain plan with join with colocated tables",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
+        "output": [
+          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+          "    └── [1]@localhost:1 PROJECT\n",
+          "        └── [1]@localhost:1 JOIN\n",
+          "            ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+          "            │   └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
+          "            │       └── [2]@localhost:1 PROJECT\n",
+          "            │           └── [2]@localhost:1 TABLE SCAN (a) null\n",
+          "            └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+          "                └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
+          "                    └── [3]@localhost:1 PROJECT\n",
+          "                        └── [3]@localhost:1 FILTER\n",
+          "                            └── [3]@localhost:1 TABLE SCAN (b) null\n"
+        ]
+      }
+    ]
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org