You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/22 03:16:01 UTC

[iotdb] branch xingtanzjr/distribution_planner created (now 38c2c89)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/distribution_planner
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 38c2c89  complete SubPlan builder

This branch includes the following new commits:

     new 38c2c89  complete SubPlan builder

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: complete SubPlan builder

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/distribution_planner
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 38c2c89487720a3529652e9d0d02e560226240aa
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Mar 22 11:15:04 2022 +0800

    complete SubPlan builder
---
 .../mpp/sql/planner/plan/DistributedQueryPlan.java | 13 +++--
 .../mpp/sql/planner/plan/DistributionPlanner.java  | 52 ++++++++++++++++-
 .../db/mpp/sql/planner/plan/PlanFragment.java      | 17 ++++++
 .../db/mpp/sql/planner/plan/PlanFragmentId.java    | 15 +++++
 .../iotdb/db/mpp/sql/planner/plan/SubPlan.java     | 68 ++++++++++++++++++++++
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  4 ++
 .../planner/plan/node/process/ExchangeNode.java    | 43 +++++++++-----
 .../planner/plan/node/sink/FragmentSinkNode.java   | 28 ++++++++-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 35 +++++++++++
 9 files changed, 252 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 8cf0341..bb0c299 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -19,15 +19,18 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 import java.util.List;
 
 public class DistributedQueryPlan {
   private MPPQueryContext context;
-  private PlanNode rootNode;
-  private PlanFragment rootFragment;
-
-  // TODO: consider whether this field is necessary when do the implementation
+  private SubPlan rootSubPlan;
   private List<PlanFragment> fragments;
+
+  public DistributedQueryPlan(
+      MPPQueryContext context, SubPlan rootSubPlan, List<PlanFragment> fragments) {
+    this.context = context;
+    this.rootSubPlan = rootSubPlan;
+    this.fragments = fragments;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
index b8ac550..2df1f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.common.DataRegion;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 
@@ -50,8 +51,17 @@ public class DistributionPlanner {
     return adder.visit(root, new NodeGroupContext());
   }
 
+  public SubPlan splitFragment(PlanNode root) {
+    FragmentBuilder fragmentBuilder = new FragmentBuilder();
+    return fragmentBuilder.splitToSubPlan(root);
+  }
+
   public DistributedQueryPlan planFragments() {
-    return null;
+    PlanNode rootAfterRewrite = rewriteSource();
+    PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+    SubPlan subPlan = splitFragment(rootWithExchange);
+    return new DistributedQueryPlan(
+        logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList());
   }
 
   private class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -185,7 +195,7 @@ public class DistributionPlanner {
           child -> {
             if (!dataRegion.equals(context.getNodeDistribution(child.getId()).dataRegion)) {
               ExchangeNode exchangeNode = new ExchangeNode(PlanNodeIdAllocator.generateId());
-              exchangeNode.setSourceNode(child);
+              exchangeNode.setChild(child);
               newNode.addChild(exchangeNode);
             } else {
               newNode.addChild(child);
@@ -251,4 +261,42 @@ public class DistributionPlanner {
       this.dataRegion = dataRegion;
     }
   }
+
+  private class FragmentBuilder {
+    public SubPlan splitToSubPlan(PlanNode root) {
+      SubPlan rootSubPlan = createSubPlan(root);
+      splitToSubPlan(root, rootSubPlan);
+      return rootSubPlan;
+    }
+
+    private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+      if (root instanceof ExchangeNode) {
+        // We add a FragmentSinkNode for newly created PlanFragment
+        ExchangeNode exchangeNode = (ExchangeNode) root;
+        FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
+        sinkNode.setChild(exchangeNode.getChild());
+        sinkNode.setRemoteDestinationNode(exchangeNode);
+        // Record the source node info in the ExchangeNode so that we can keep the connection of
+        // these nodes/fragments
+        exchangeNode.setRemoteSourceNode(sinkNode);
+        // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
+        exchangeNode.cleanChildren();
+
+        // Build the child SubPlan Tree
+        SubPlan childSubPlan = createSubPlan(sinkNode);
+        splitToSubPlan(sinkNode, childSubPlan);
+
+        subPlan.addChild(childSubPlan);
+        return;
+      }
+      for (PlanNode child : root.getChildren()) {
+        splitToSubPlan(child, subPlan);
+      }
+    }
+
+    private SubPlan createSubPlan(PlanNode root) {
+      PlanFragment fragment = new PlanFragment(PlanFragmentId.generateId(), root);
+      return new SubPlan(fragment);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 0aa3ac5..f8de7f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -25,4 +25,21 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 public class PlanFragment {
   private PlanFragmentId id;
   private PlanNode root;
+
+  public PlanFragment(PlanFragmentId id, PlanNode root) {
+    this.id = id;
+    this.root = root;
+  }
+
+  public PlanFragmentId getId() {
+    return id;
+  }
+
+  public PlanNode getRoot() {
+    return root;
+  }
+
+  public String toString() {
+    return String.format("PlanFragment-%s", getId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
index 38a56f3..4f83c05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
@@ -24,4 +24,19 @@ public class PlanFragmentId {
   public PlanFragmentId(String id) {
     this.id = id;
   }
+
+  public static int initialId = 0;
+
+  public static synchronized PlanFragmentId generateId() {
+    initialId++;
+    return new PlanFragmentId(String.valueOf(initialId));
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String toString() {
+    return id;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
new file mode 100644
index 0000000..1ce2f1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SubPlan {
+  private PlanFragment planFragment;
+  private List<SubPlan> children;
+
+  public SubPlan(PlanFragment planFragment) {
+    this.planFragment = planFragment;
+    this.children = new ArrayList<>();
+  }
+
+  public void setChildren(List<SubPlan> children) {
+    this.children = children;
+  }
+
+  public void addChild(SubPlan subPlan) {
+    this.children.add(subPlan);
+  }
+
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append(
+        String.format(
+            "SubPlan-%s. RootNodeId: %s\n", planFragment.getId(), planFragment.getRoot().getId()));
+    children.forEach(result::append);
+    return result.toString();
+  }
+
+  public PlanFragment getPlanFragment() {
+    return this.planFragment;
+  }
+
+  public List<SubPlan> getChildren() {
+    return this.children;
+  }
+
+  public List<PlanFragment> getPlanFragmentList() {
+    List<PlanFragment> result = new ArrayList<>();
+    result.add(this.planFragment);
+    this.children.forEach(
+        child -> {
+          result.add(child.getPlanFragment());
+        });
+    return result;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 0be251d..b0ff572 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -73,4 +73,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitTimeJoin(TimeJoinNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitExchange(ExchangeNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 4a36a75..be2b351 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -19,17 +19,23 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
 
 public class ExchangeNode extends PlanNode {
-  private PlanNode sourceNode;
-  private FragmentId sourceFragmentId;
+  private PlanNode child;
+  private FragmentSinkNode remoteSourceNode;
+
+  // In current version, one ExchangeNode will only have one source.
+  // And the fragment which the sourceNode belongs to will only have one instance.
+  // Thus, by nodeId and endpoint, the ExchangeNode can know where its source from.
+  private PlanNodeId sourceNodeId; // sourceNodeId is the same as the child's PlanNodeId
+  private String sourceEndpoint; // The endpoint where the sourceNode will be distributed
 
   public ExchangeNode(PlanNodeId id) {
     super(id);
@@ -37,7 +43,10 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return ImmutableList.of(sourceNode);
+    if (this.child == null) {
+      return ImmutableList.of();
+    }
+    return ImmutableList.of(child);
   }
 
   @Override
@@ -48,7 +57,7 @@ public class ExchangeNode extends PlanNode {
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
     ExchangeNode node = new ExchangeNode(getId());
-    node.setSourceNode(children.get(0));
+    node.setChild(children.get(0));
     return node;
   }
 
@@ -57,23 +66,27 @@ public class ExchangeNode extends PlanNode {
     return null;
   }
 
-  public void setSourceFragmentId(FragmentId sourceFragmentId) {
-    this.sourceFragmentId = sourceFragmentId;
+  public PlanNode getChild() {
+    return child;
   }
 
-  public FragmentId getSourceFragmentId() {
-    return sourceFragmentId;
+  public void setChild(PlanNode child) {
+    this.child = child;
   }
 
-  public PlanNode getSourceNode() {
-    return sourceNode;
+  public String toString() {
+    return String.format("ExchangeNode-%s: [Source: %s]", getId(), remoteSourceNode);
   }
 
-  public void setSourceNode(PlanNode sourceNode) {
-    this.sourceNode = sourceNode;
+  public FragmentSinkNode getRemoteSourceNode() {
+    return remoteSourceNode;
   }
 
-  public String toString() {
-    return String.format("ExchangeNode-%s", getId());
+  public void setRemoteSourceNode(FragmentSinkNode remoteSourceNode) {
+    this.remoteSourceNode = remoteSourceNode;
+  }
+
+  public void cleanChildren() {
+    this.child = null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 6151916..935ca13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -20,17 +20,23 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+
+import com.google.common.collect.ImmutableList;
 
 import java.util.List;
 
 public class FragmentSinkNode extends SinkNode {
+  private PlanNode child;
+  private ExchangeNode remoteDestinationNode;
+
   public FragmentSinkNode(PlanNodeId id) {
     super(id);
   }
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return ImmutableList.of(child);
   }
 
   @Override
@@ -53,4 +59,24 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public void close() throws Exception {}
+
+  public PlanNode getChild() {
+    return child;
+  }
+
+  public void setChild(PlanNode child) {
+    this.child = child;
+  }
+
+  public String toString() {
+    return String.format("FragmentSinkNode-%s", getId());
+  }
+
+  public ExchangeNode getRemoteDestinationNode() {
+    return remoteDestinationNode;
+  }
+
+  public void setRemoteDestinationNode(ExchangeNode remoteDestinationNode) {
+    this.remoteDestinationNode = remoteDestinationNode;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index b3e5e3c..ec90301 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -24,8 +24,11 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
@@ -104,6 +107,38 @@ public class DistributionPlannerTest {
         rootWithExchange.getChildren().get(0).getChildren().get(2).getChildren().size(), 1);
   }
 
+  @Test
+  public void TestSplitFragment() throws IllegalPathException {
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(
+            PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+
+    LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+
+    Analysis analysis = constructAnalysis();
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
+    PlanNode rootAfterRewrite = planner.rewriteSource();
+    PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
+    PlanNodeUtil.printPlanNode(rootWithExchange);
+    SubPlan subPlan = planner.splitFragment(rootWithExchange);
+    System.out.println(subPlan);
+    List<PlanFragment> fragments = subPlan.getPlanFragmentList();
+    fragments.forEach(
+        f -> {
+          System.out.println(f);
+          PlanNodeUtil.printPlanNode(f.getRoot());
+        });
+  }
+
   private Analysis constructAnalysis() {
     Analysis analysis = new Analysis();
     Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo = new HashMap<>();