You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/28 16:58:55 UTC
[iotdb] branch xingtanzjr/polish_node updated: connect from sql to distributed plan
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/polish_node by this push:
new c63f8a4 connect from sql to distributed plan
c63f8a4 is described below
commit c63f8a498a315fff75a0d69b3e021d502382e207
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Mar 29 00:57:52 2022 +0800
connect from sql to distributed plan
---
.../iotdb/db/mpp/execution/QueryExecution.java | 18 +++++++-
.../db/mpp/sql/planner/DistributionPlanner.java | 10 ++---
.../db/mpp/sql/planner/plan/FragmentInstance.java | 1 -
.../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 51 +++++++++++++++++++++-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 3 +-
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 17 ++++++--
6 files changed, 84 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0b15ef1..4cbbb44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.rpc.TSStatusCode;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import static org.apache.iotdb.rpc.RpcUtils.getStatus;
@@ -55,6 +56,7 @@ public class QueryExecution {
public QueryExecution(Statement statement, MPPQueryContext context) {
this.context = context;
+ this.planOptimizers = new ArrayList<>();
this.analysis = analyze(statement, context);
}
@@ -76,13 +78,13 @@ public class QueryExecution {
}
// Use LogicalPlanner to do the logical query plan and logical optimization
- private void doLogicalPlan() {
+ public void doLogicalPlan() {
LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers);
this.logicalPlan = planner.plan(this.analysis);
}
// Generate the distributed plan and split it into fragments
- private void doDistributedPlan() {
+ public void doDistributedPlan() {
DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
this.distributedPlan = planner.planFragments();
}
@@ -102,4 +104,16 @@ public class QueryExecution {
return new ExecutionResult(context.getQueryId(), getStatus(TSStatusCode.SUCCESS_STATUS));
}
+
+ public DistributedQueryPlan getDistributedPlan() {
+ return distributedPlan;
+ }
+
+ public LogicalQueryPlan getLogicalPlan() {
+ return logicalPlan;
+ }
+
+ public List<FragmentInstance> getFragmentInstances() {
+ return fragmentInstances;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index cd26a35..881ea55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -21,13 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
-import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
-import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
-import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
@@ -71,8 +65,10 @@ public class DistributionPlanner {
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
+ System.out.println("===== Step 2: Partition SourceNode =====");
System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+ System.out.println("===== Step 3: Add ExchangeNode =====");
System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
SubPlan subPlan = splitFragment(rootWithExchange);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 9cb6e59..61f9292 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -95,7 +95,6 @@ public class FragmentInstance implements IConsensusRequest {
getId(), getHostEndpoint().getIp(), getDataRegionId().getId()));
ret.append("---- Plan Node Tree ----\n");
ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
- ret.append("\n");
return ret.toString();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index f05a1be..31c3977 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -19,9 +19,16 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.stream.Collectors;
public class PlanNodeUtil {
+ private static final String INDENT = " ";
+ private static final String BRO = " │─";
+ private static final String CORNER = " └─";
+ private static final String LINE = " │ ";
+
public static void printPlanNode(PlanNode root) {
printPlanNodeWithLevel(root, 0);
}
@@ -42,13 +49,14 @@ public class PlanNodeUtil {
public static String nodeToString(PlanNode root) {
StringBuilder result = new StringBuilder();
- nodeToString(root, 0, result);
+// nodeToString(root, 0, result);
+ nodeToString(root, new PrintContext(false, 0, new TreeMap<>()), result);
return result.toString();
}
private static void nodeToString(PlanNode root, int level, StringBuilder result) {
for (int i = 0; i < level; i++) {
- result.append("\t");
+ result.append(INDENT);
}
result.append(root.toString());
result.append(System.lineSeparator());
@@ -57,9 +65,48 @@ public class PlanNodeUtil {
}
}
+ private static void nodeToString(PlanNode root, PrintContext ctx, StringBuilder result) {
+ int level = ctx.level;
+ for (int i = 0 ; i < level; i ++) {
+ result.append(ctx.codeMap.get(i));
+ }
+ result.append(root.toString());
+ result.append(System.lineSeparator());
+ for (int i = 0; i < root.getChildren().size(); i ++) {
+ PlanNode child = root.getChildren().get(i);
+ PrintContext childCtx = ctx.clone();
+ childCtx.isLast = i == root.getChildren().size() - 1;
+ if (childCtx.level - 1 >= 0) {
+ childCtx.codeMap.put(ctx.level - 1, ctx.isLast ? INDENT : LINE);
+ }
+ childCtx.codeMap.put(ctx.level, childCtx.isLast ? CORNER : BRO);
+ childCtx.level ++;
+ nodeToString(child, childCtx, result);
+ }
+ }
+
public static PlanNode deepCopy(PlanNode root) {
List<PlanNode> children =
root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
return root.cloneWithChildren(children);
}
+
+ private static class PrintContext {
+ public boolean isLast;
+ public int level;
+ public Map<Integer, String> codeMap;
+ public PrintContext() {
+ codeMap = new TreeMap<>();
+ }
+
+ public PrintContext(boolean isLast, int level, Map<Integer, String> codeMap) {
+ this.isLast = isLast;
+ this.level = level;
+ this.codeMap = codeMap;
+ }
+
+ public PrintContext clone() {
+ return new PrintContext(isLast, level, new TreeMap<>(codeMap));
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index aba4a00..8f5ff02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -72,7 +73,7 @@ public class DistributionPlannerTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
PlanNode newRoot = planner.rewriteSource();
- // PlanNodeUtil.printPlanNode(newRoot);
+ System.out.println(PlanNodeUtil.nodeToString(newRoot));
assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index bf8580b..88e5690 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.QueryExecution;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.junit.Test;
@@ -35,12 +37,21 @@ public class QueryPlannerTest {
@Test
public void TestSqlToDistributedPlan() {
- String querySql = "SELECT d1.*, d22.s1 FROM root.sg";
+ String querySql = "SELECT d1.*, d333.s1 FROM root.sg LIMIT 10";
Statement stmt = StatementGenerator.createStatement(querySql, ZoneId.systemDefault());
- System.out.println(stmt);
QueryExecution queryExecution = new QueryExecution(stmt, new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ));
- System.out.println(queryExecution);
+ queryExecution.doLogicalPlan();
+ System.out.printf("SQL: %s%n%n", querySql);
+ System.out.println("===== Step 1: Logical Plan =====");
+ System.out.println(PlanNodeUtil.nodeToString(queryExecution.getLogicalPlan().getRootNode()));
+
+ queryExecution.doDistributedPlan();
+ DistributedQueryPlan distributedQueryPlan = queryExecution.getDistributedPlan();
+
+ System.out.println("===== Step 4: Split Fragment Instance =====");
+ distributedQueryPlan.getInstances().forEach(System.out::println);
+
}
}