You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/28 16:58:55 UTC

[iotdb] branch xingtanzjr/polish_node updated: connect from sql to distributed plan

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/polish_node by this push:
     new c63f8a4  connect from sql to distributed plan
c63f8a4 is described below

commit c63f8a498a315fff75a0d69b3e021d502382e207
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Mar 29 00:57:52 2022 +0800

    connect from sql to distributed plan
---
 .../iotdb/db/mpp/execution/QueryExecution.java     | 18 +++++++-
 .../db/mpp/sql/planner/DistributionPlanner.java    | 10 ++---
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  1 -
 .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 51 +++++++++++++++++++++-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  3 +-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    | 17 ++++++--
 6 files changed, 84 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0b15ef1..4cbbb44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.iotdb.rpc.RpcUtils.getStatus;
@@ -55,6 +56,7 @@ public class QueryExecution {
 
   public QueryExecution(Statement statement, MPPQueryContext context) {
     this.context = context;
+    this.planOptimizers = new ArrayList<>();
     this.analysis = analyze(statement, context);
   }
 
@@ -76,13 +78,13 @@ public class QueryExecution {
   }
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
-  private void doLogicalPlan() {
+  public void doLogicalPlan() {
     LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers);
     this.logicalPlan = planner.plan(this.analysis);
   }
 
   // Generate the distributed plan and split it into fragments
-  private void doDistributedPlan() {
+  public void doDistributedPlan() {
     DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
     this.distributedPlan = planner.planFragments();
   }
@@ -102,4 +104,16 @@ public class QueryExecution {
 
     return new ExecutionResult(context.getQueryId(), getStatus(TSStatusCode.SUCCESS_STATUS));
   }
+
+  public DistributedQueryPlan getDistributedPlan() {
+    return distributedPlan;
+  }
+
+  public LogicalQueryPlan getLogicalPlan() {
+    return logicalPlan;
+  }
+
+  public List<FragmentInstance> getFragmentInstances() {
+    return fragmentInstances;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index cd26a35..881ea55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -21,13 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner;
 import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
-import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
-import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
-import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
@@ -71,8 +65,10 @@ public class DistributionPlanner {
 
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
+    System.out.println("===== Step 2: Partition SourceNode =====");
     System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+    System.out.println("===== Step 3: Add ExchangeNode =====");
     System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
     SubPlan subPlan = splitFragment(rootWithExchange);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 9cb6e59..61f9292 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -95,7 +95,6 @@ public class FragmentInstance implements IConsensusRequest {
             getId(), getHostEndpoint().getIp(), getDataRegionId().getId()));
     ret.append("---- Plan Node Tree ----\n");
     ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
-    ret.append("\n");
     return ret.toString();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index f05a1be..31c3977 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -19,9 +19,16 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 public class PlanNodeUtil {
+  private static final String INDENT = "   ";
+  private static final String BRO = " │─";
+  private static final String CORNER = " └─";
+  private static final String LINE = " │ ";
+
   public static void printPlanNode(PlanNode root) {
     printPlanNodeWithLevel(root, 0);
   }
@@ -42,13 +49,14 @@ public class PlanNodeUtil {
 
   public static String nodeToString(PlanNode root) {
     StringBuilder result = new StringBuilder();
-    nodeToString(root, 0, result);
+//    nodeToString(root, 0, result);
+    nodeToString(root, new PrintContext(false, 0, new TreeMap<>()), result);
     return result.toString();
   }
 
   private static void nodeToString(PlanNode root, int level, StringBuilder result) {
     for (int i = 0; i < level; i++) {
-      result.append("\t");
+      result.append(INDENT);
     }
     result.append(root.toString());
     result.append(System.lineSeparator());
@@ -57,9 +65,48 @@ public class PlanNodeUtil {
     }
   }
 
+  private static void nodeToString(PlanNode root, PrintContext ctx, StringBuilder result) {
+    int level = ctx.level;
+    for (int i = 0 ; i < level; i ++) {
+      result.append(ctx.codeMap.get(i));
+    }
+    result.append(root.toString());
+    result.append(System.lineSeparator());
+    for (int i = 0; i < root.getChildren().size(); i ++) {
+      PlanNode child = root.getChildren().get(i);
+      PrintContext childCtx = ctx.clone();
+      childCtx.isLast = i == root.getChildren().size() - 1;
+      if (childCtx.level - 1 >= 0) {
+        childCtx.codeMap.put(ctx.level - 1, ctx.isLast ? INDENT : LINE);
+      }
+      childCtx.codeMap.put(ctx.level, childCtx.isLast ? CORNER : BRO);
+      childCtx.level ++;
+      nodeToString(child, childCtx, result);
+    }
+  }
+
   public static PlanNode deepCopy(PlanNode root) {
     List<PlanNode> children =
         root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
     return root.cloneWithChildren(children);
   }
+
+  private static class PrintContext {
+    public boolean isLast;
+    public int level;
+    public Map<Integer, String> codeMap;
+    public PrintContext() {
+      codeMap = new TreeMap<>();
+    }
+
+    public PrintContext(boolean isLast, int level, Map<Integer, String> codeMap) {
+      this.isLast = isLast;
+      this.level = level;
+      this.codeMap = codeMap;
+    }
+
+    public PrintContext clone() {
+      return new PrintContext(isLast, level, new TreeMap<>(codeMap));
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index aba4a00..8f5ff02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -72,7 +73,7 @@ public class DistributionPlannerTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
     PlanNode newRoot = planner.rewriteSource();
 
-    //        PlanNodeUtil.printPlanNode(newRoot);
+    System.out.println(PlanNodeUtil.nodeToString(newRoot));
     assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index bf8580b..88e5690 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.QueryExecution;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.junit.Test;
 
@@ -35,12 +37,21 @@ public class QueryPlannerTest {
   @Test
   public void TestSqlToDistributedPlan() {
 
-    String querySql = "SELECT d1.*, d22.s1 FROM root.sg";
+    String querySql = "SELECT d1.*, d333.s1 FROM root.sg LIMIT 10";
 
     Statement stmt = StatementGenerator.createStatement(querySql, ZoneId.systemDefault());
-    System.out.println(stmt);
 
     QueryExecution queryExecution = new QueryExecution(stmt, new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ));
-    System.out.println(queryExecution);
+    queryExecution.doLogicalPlan();
+    System.out.printf("SQL: %s%n%n", querySql);
+    System.out.println("===== Step 1: Logical Plan =====");
+    System.out.println(PlanNodeUtil.nodeToString(queryExecution.getLogicalPlan().getRootNode()));
+
+    queryExecution.doDistributedPlan();
+    DistributedQueryPlan distributedQueryPlan = queryExecution.getDistributedPlan();
+
+    System.out.println("===== Step 4: Split Fragment Instance =====");
+    distributedQueryPlan.getInstances().forEach(System.out::println);
+
   }
 }