You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/08 04:36:46 UTC
(pinot) branch master updated: [test][multistage] enhance query plan test (#11966)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b5e982333d [test][multistage] enhance query plan test (#11966)
b5e982333d is described below
commit b5e982333daf7ad2f59fe9c534d8413c0f33ff58
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Nov 7 20:36:39 2023 -0800
[test][multistage] enhance query plan test (#11966)
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../query/planner/PhysicalExplainPlanVisitor.java | 24 +++----
.../apache/pinot/query/QueryCompilationTest.java | 51 --------------
.../pinot/query/QueryEnvironmentTestBase.java | 21 +++++-
.../query/queries/ResourceBasedQueryPlansTest.java | 7 +-
.../resources/queries/ExplainPhysicalPlans.json | 80 ++++++++++++++++++++++
5 files changed, 115 insertions(+), 68 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
index 53dd9bf568..245f531522 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
@@ -156,27 +156,23 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
MailboxSendNode sender = (MailboxSendNode) node.getSender();
int senderStageId = node.getSenderStageId();
DispatchablePlanFragment dispatchablePlanFragment = _dispatchableSubPlan.getQueryStageList().get(senderStageId);
- Map<Integer, Map<String, List<String>>> segments = dispatchablePlanFragment.getWorkerIdToSegmentsMap();
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
dispatchablePlanFragment.getServerInstanceToWorkerIdMap();
Iterator<QueryServerInstance> iterator = serverInstanceToWorkerIdMap.keySet().iterator();
while (iterator.hasNext()) {
QueryServerInstance queryServerInstance = iterator.next();
- for (int workerId : serverInstanceToWorkerIdMap.get(queryServerInstance)) {
- if (segments.containsKey(workerId)) {
- // always print out leaf stages
- sender.visit(this, context.next(iterator.hasNext(), queryServerInstance, workerId));
+ List<Integer> workerIdList = serverInstanceToWorkerIdMap.get(queryServerInstance);
+ for (int idx = 0; idx < workerIdList.size(); idx++) {
+ int workerId = workerIdList.get(idx);
+ if (!iterator.hasNext() && idx == workerIdList.size() - 1) {
+ // always print out the last one
+ sender.visit(this, context.next(false, queryServerInstance, workerId));
} else {
- if (!iterator.hasNext()) {
- // always print out the last one
- sender.visit(this, context.next(false, queryServerInstance, workerId));
- } else {
- // only print short version of the sender node
- appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
- .append(" (Subtree Omitted)")
- .append('\n');
- }
+ // only print short version of the sender node
+ appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
+ .append(" (Subtree Omitted)")
+ .append('\n');
}
}
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index f04167e1b0..6d59f8da22 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -53,12 +53,6 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
testQueryPlanExplain(query, digest);
}
- @Test(dataProvider = "testQueryPhysicalPlanDataProvider")
- public void testQueryPlanExplainPhysical(String query, String digest)
- throws Exception {
- testQueryPlanExplain(query, digest);
- }
-
private void testQueryPlanExplain(String query, String digest) {
try {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
@@ -466,49 +460,4 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
};
//@formatter:on
}
-
- @DataProvider(name = "testQueryPhysicalPlanDataProvider")
- private Object[][] provideQueriesWithExplainedPhysicalPlan() {
- //@formatter:off
- return new Object[][] {
-new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
- "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
-+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ "│ └── [1]@localhost:1 PROJECT\n"
-+ "│ └── [1]@localhost:1 TABLE SCAN (a) null\n"
-+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ " └── [1]@localhost:2 PROJECT\n"
-+ " └── [1]@localhost:2 TABLE SCAN (a) null\n"},
-new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
-"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
-+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
-+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ " └── [1]@localhost:2 AGGREGATE_FINAL\n"
-+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-+ " ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ " │ └── [2]@localhost:1 AGGREGATE_LEAF\n"
-+ " │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
-+ " └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ " └── [2]@localhost:2 AGGREGATE_LEAF\n"
-+ " └── [2]@localhost:2 TABLE SCAN (a) null\n"},
-new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
- "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
-+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
-+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
-+ " └── [1]@localhost:2 PROJECT\n"
-+ " └── [1]@localhost:2 JOIN\n"
-+ " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-+ " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ " │ │ └── [2]@localhost:1 PROJECT\n"
-+ " │ │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
-+ " │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ " │ └── [2]@localhost:2 PROJECT\n"
-+ " │ └── [2]@localhost:2 TABLE SCAN (a) null\n"
-+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
-+ " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
-+ " └── [3]@localhost:1 PROJECT\n"
-+ " └── [3]@localhost:1 TABLE SCAN (b) null\n"}
- };
- //@formatter:on
- }
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 3dbc6833ef..958ae86f8f 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +58,23 @@ public class QueryEnvironmentTestBase {
"d_REALTIME", ImmutableList.of("d2"), "d_OFFLINE", ImmutableList.of("d3"), "e_REALTIME",
ImmutableList.of("e2"), "e_OFFLINE", ImmutableList.of("e3"));
public static final Map<String, Schema> TABLE_SCHEMAS = new HashMap<>();
+ public static final Map<String, Pair<String, List<List<String>>>> PARTITIONED_SEGMENTS_MAP = new HashMap<>();
+ public static final int PARTITION_COUNT = 4;
+ public static final Map<String, String> PARTITIONED_TABLES =
+ ImmutableMap.of("a_REALTIME", "col2", "b_REALTIME", "col1");
+ static {
+ for (Map.Entry<String, String> e : PARTITIONED_TABLES.entrySet()) {
+ String tableName = e.getKey();
+ String partitionColumn = e.getValue();
+ List<List<String>> partitionIdToSegmentsMap = new ArrayList<>(PARTITION_COUNT);
+ partitionIdToSegmentsMap.add(SERVER1_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
+ partitionIdToSegmentsMap.add(SERVER2_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
+ for (int i = 2; i < PARTITION_COUNT; i++) {
+ partitionIdToSegmentsMap.add(new ArrayList<>());
+ }
+ PARTITIONED_SEGMENTS_MAP.put(tableName, Pair.of(partitionColumn, partitionIdToSegmentsMap));
+ }
+ }
static {
TABLE_SCHEMAS.put("a_REALTIME", getSchemaBuilder("a").build());
@@ -84,7 +102,8 @@ public class QueryEnvironmentTestBase {
@BeforeClass
public void setUp() {
// the port doesn't matter as we are not actually making a server call.
- _queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, null);
+ _queryEnvironment =
+ getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, PARTITIONED_SEGMENTS_MAP);
}
@DataProvider(name = "testQueryDataProvider")
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
index c545bd2e29..562e3d23fb 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
@@ -41,6 +41,8 @@ import org.testng.annotations.Test;
public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final String EXPLAIN_REGEX =
+ "EXPLAIN (IMPLEMENTATION )*PLAN (INCLUDING |EXCLUDING )*(ALL )*(ATTRIBUTES )*(AS DOT |AS JSON |AS TEXT )*FOR ";
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
@@ -51,7 +53,8 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
Assert.assertEquals(explainedPlan, output,
String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
- String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
+ // use a regex to exclude the
+ String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(queryWithoutExplainPlan);
Assert.assertNotNull(dispatchableSubPlan,
String.format("Test case %s for query %s should not have a null QueryPlan",
@@ -66,7 +69,7 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
try {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
_queryEnvironment.explainQuery(query, requestId);
- String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
+ String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
_queryEnvironment.planQuery(queryWithoutExplainPlan);
Assert.fail("Query compilation should have failed with exception message pattern: " + expectedException);
} catch (Exception e) {
diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
new file mode 100644
index 0000000000..9f9173a107
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -0,0 +1,80 @@
+{
+ "physical_plan_explain_formats": {
+ "queries": [
+ {
+ "description": "explain plan with attributes",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
+ "output": [
+ "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+ " └── [1]@localhost:2 PROJECT\n",
+ " └── [1]@localhost:2 TABLE SCAN (a) null\n"
+ ]
+ },
+ {
+ "description": "explain plan without attributes",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
+ "output": [
+ "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+ " └── [1]@localhost:2 AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
+ " └── [2]@localhost:2 AGGREGATE_LEAF\n",
+ " └── [2]@localhost:2 TABLE SCAN (a) null\n"
+ ]
+ },
+ {
+ "description": "explain plan with join",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
+ "output": [
+ "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+ " └── [1]@localhost:2 PROJECT\n",
+ " └── [1]@localhost:2 JOIN\n",
+ " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
+ " │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
+ " │ └── [2]@localhost:2 PROJECT\n",
+ " │ └── [2]@localhost:2 TABLE SCAN (a) null\n",
+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
+ " └── [3]@localhost:1 PROJECT\n",
+ " └── [3]@localhost:1 TABLE SCAN (b) null\n"
+ ]
+ },
+ {
+ "description": "explain plan with join with colocated tables",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
+ "output": [
+ "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
+ " └── [1]@localhost:1 PROJECT\n",
+ " └── [1]@localhost:1 JOIN\n",
+ " ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+ " │ └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
+ " │ └── [2]@localhost:1 PROJECT\n",
+ " │ └── [2]@localhost:1 TABLE SCAN (a) null\n",
+ " └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
+ " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
+ " └── [3]@localhost:1 PROJECT\n",
+ " └── [3]@localhost:1 FILTER\n",
+ " └── [3]@localhost:1 TABLE SCAN (b) null\n"
+ ]
+ }
+ ]
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org