You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/20 08:06:34 UTC

[iotdb] branch xingtanzjr/logical_to_distributed updated: complete source rewriter for distributed plan

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/logical_to_distributed
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/logical_to_distributed by this push:
     new b17c657  complete source rewriter for distributed plan
b17c657 is described below

commit b17c6579a78e450ab7ed0be3a29a977a6c0ffd6d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sun Mar 20 16:05:47 2022 +0800

    complete source rewriter for distributed plan
---
 .../org/apache/iotdb/db/mpp/common/Analysis.java   |  4 +
 .../org/apache/iotdb/db/mpp/common/DataRegion.java |  9 +++
 .../iotdb/db/mpp/common/FilterNullPolicy.java      |  1 +
 .../sql/planner/optimization/PlanOptimizer.java    |  2 +-
 .../mpp/sql/planner/plan/DistributionPlanner.java  |  9 ++-
 .../db/mpp/sql/planner/plan/LogicalQueryPlan.java  | 13 ++++
 .../planner/plan/node/PlanNodeAllocator.java}      | 14 ++--
 .../planner/plan/node/PlanNodeUtil.java}           | 28 +++----
 .../sql/planner/plan/node/process/LimitNode.java   | 25 ++++--
 .../planner/plan/node/process/TimeJoinNode.java    | 27 +++++--
 .../planner/plan/node/source/SeriesScanNode.java   | 10 ++-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 91 ++++++++++++++++++++++
 12 files changed, 199 insertions(+), 34 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
index 08fbc45..85ff111 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
@@ -53,4 +53,8 @@ public class Analysis {
             return null;
         }
     }
+
+    public void setDataPartitionInfo(Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo) {
+        this.dataPartitionInfo = dataPartitionInfo;
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
index a46afdf..5618e1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
@@ -27,6 +27,11 @@ public class DataRegion {
     private Integer dataRegionId;
     private String endpoint;
 
+    public DataRegion(Integer dataRegionId, String endpoint) {
+        this.dataRegionId = dataRegionId;
+        this.endpoint = endpoint;
+    }
+
     public int hashCode() {
         return dataRegionId.hashCode();
     }
@@ -37,4 +42,8 @@ public class DataRegion {
         }
         return false;
     }
+
+    public String toString() {
+        return String.format("%s/%d",this.endpoint, this.dataRegionId);
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
index 7979b27..9de26fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.common;
 
 public enum FilterNullPolicy {
+  NO_FILTER,
   CONTAINS_NULL,
   ALL_NULL
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
index f98f014..ff2baae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
@@ -23,5 +23,5 @@ import org.apache.iotdb.db.mpp.common.TsBlock;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 public interface PlanOptimizer {
-  PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
+  PlanNode optimize(PlanNode plan, QueryContext context);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
index d9fa64c..9c55a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
@@ -37,6 +37,11 @@ public class DistributionPlanner {
         this.logicalPlan = logicalPlan;
     }
 
+    public PlanNode rewriteSource() {
+        SourceRewriter rewriter = new SourceRewriter();
+        return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext());
+    }
+
     public DistributedQueryPlan planFragments() {
         return null;
     }
@@ -64,7 +69,7 @@ public class DistributionPlanner {
                 } else {
                     // In a general logical query plan, the children of TimeJoinNode should only be SeriesScanNode or SeriesAggregateScanNode
                     // So this branch should not be touched.
-                    root.addChild(generateDistributedPlan(child, context));
+                    root.addChild(visit(child, context));
                 }
             }
 
@@ -93,7 +98,7 @@ public class DistributionPlanner {
             return root;
         }
 
-        public PlanNode generateDistributedPlan(PlanNode node, DistributionPlanContext context) {
+        public PlanNode visit(PlanNode node, DistributionPlanContext context) {
             return node.accept(this, context);
         }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
index 666fbf3..788746a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
@@ -28,4 +28,17 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 public class LogicalQueryPlan {
   private QueryContext context;
   private PlanNode rootNode;
+
+  public LogicalQueryPlan(QueryContext context, PlanNode rootNode) {
+    this.context = context;
+    this.rootNode = rootNode;
+  }
+
+  public PlanNode getRootNode() {
+    return rootNode;
+  }
+
+  public QueryContext getContext() {
+    return context;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeAllocator.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeAllocator.java
index 7979b27..edac517 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeAllocator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,9 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
 
-public enum FilterNullPolicy {
-  CONTAINS_NULL,
-  ALL_NULL
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+public class PlanNodeAllocator {
+    public static int initialId = 0;
+    public static synchronized PlanNodeId generateId() {
+        initialId++;
+        return new PlanNodeId(String.valueOf(initialId));
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index a46afdf..f174b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -17,24 +17,24 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
-/**
- * This class is used to represent the data partition info including the DataRegionId and physical node IP address
- */
-//TODO: (xingtanzjr) This class should be substituted with the class defined in Consensus level
-public class DataRegion {
-    private Integer dataRegionId;
-    private String endpoint;
+public class PlanNodeUtil {
+    public static void printPlanNode(PlanNode root) {
+        printPlanNodeWithLevel(root, 0);
+    }
 
-    public int hashCode() {
-        return dataRegionId.hashCode();
+    private static void printPlanNodeWithLevel(PlanNode root, int level) {
+        printTab(level);
+        System.out.println(root.toString());
+        for (PlanNode child : root.getChildren()) {
+            printPlanNodeWithLevel(child, level + 1);
+        }
     }
 
-    public boolean equals(Object obj) {
-        if (obj instanceof DataRegion) {
-            return this.dataRegionId.equals(((DataRegion)obj).dataRegionId);
+    private static void printTab(int count) {
+        for(int i = 0 ; i < count; i ++) {
+            System.out.print("\t");
         }
-        return false;
     }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 5c5b1bc..06a5848 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
@@ -30,12 +31,16 @@ import java.util.List;
 public class LimitNode extends ProcessNode {
 
   // The limit count
-  private final int limit;
-  private final PlanNode child;
+  private int limit;
+  private PlanNode child;
 
-  public LimitNode(PlanNodeId id, int limit, PlanNode child) {
+  public LimitNode(PlanNodeId id, int limit) {
     super(id);
     this.limit = limit;
+  }
+
+  public LimitNode(PlanNodeId id, int limit, PlanNode child) {
+    this(id, limit);
     this.child = child;
   }
 
@@ -46,12 +51,14 @@ public class LimitNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new LimitNode(PlanNodeAllocator.generateId(), this.limit);
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+    LimitNode root = (LimitNode) this.clone();
+    root.setChild(children.get(0));
+    return root;
   }
 
   @Override
@@ -71,4 +78,12 @@ public class LimitNode extends ProcessNode {
   public PlanNode getChild() {
     return child;
   }
+
+  public void setChild(PlanNode child) {
+    this.child = child;
+  }
+
+  public String toString() {
+    return "LimitNode-" + this.getId();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 4886055..907c91a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.common.OrderBy;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -47,13 +49,21 @@ public class TimeJoinNode extends ProcessNode {
   private List<PlanNode> children;
 
   public TimeJoinNode(
+          PlanNodeId id,
+          OrderBy mergeOrder,
+          FilterNullPolicy filterNullPolicy) {
+    super(id);
+    this.mergeOrder = mergeOrder;
+    this.filterNullPolicy = filterNullPolicy;
+    this.children = new ArrayList<>();
+  }
+
+  public TimeJoinNode(
       PlanNodeId id,
       OrderBy mergeOrder,
       FilterNullPolicy filterNullPolicy,
       List<PlanNode> children) {
-    super(id);
-    this.mergeOrder = mergeOrder;
-    this.filterNullPolicy = filterNullPolicy;
+    this(id, mergeOrder, filterNullPolicy);
     this.children = children;
   }
 
@@ -64,12 +74,14 @@ public class TimeJoinNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new TimeJoinNode(PlanNodeAllocator.generateId(), this.mergeOrder, this.filterNullPolicy);
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+    TimeJoinNode node = (TimeJoinNode) this.clone();
+    node.setChildren(children);
+    return node;
   }
 
   @Override
@@ -99,4 +111,9 @@ public class TimeJoinNode extends ProcessNode {
   public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
     this.filterNullPolicy = filterNullPolicy;
   }
+
+  public String toString() {
+    return "TimeJoinNode-" + this.getId();
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 939e2d4..f6ab1c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.DataRegion;
 import org.apache.iotdb.db.mpp.common.OrderBy;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -102,12 +103,12 @@ public class SeriesScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new SeriesScanNode(PlanNodeAllocator.generateId(), this.getSeriesPath());
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+    return this.clone();
   }
 
   @Override
@@ -135,4 +136,9 @@ public class SeriesScanNode extends SourceNode {
   public DataRegion getDataRegion() {
     return dataRegion;
   }
+
+  public String toString() {
+    return String.format("SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+            this.getId(), this.getSeriesPath(), this.getDataRegion());
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
new file mode 100644
index 0000000..3a222fa
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.DistributionPlanner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class DistributionPlannerTest {
+
+    @Test
+    public void TestRewriteSourceNode() throws IllegalPathException {
+        TimeJoinNode timeJoinNode = new TimeJoinNode(PlanNodeAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+
+        timeJoinNode.addChild(new SeriesScanNode(PlanNodeAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+        timeJoinNode.addChild(new SeriesScanNode(PlanNodeAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+        timeJoinNode.addChild(new SeriesScanNode(PlanNodeAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+
+        LimitNode root = new LimitNode(PlanNodeAllocator.generateId(), 10, timeJoinNode);
+
+        Analysis analysis = constructAnalysis();
+
+        DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(new QueryContext(), root));
+        PlanNode newRoot = planner.rewriteSource();
+
+        System.out.println("\nLogical-Plan:");
+        System.out.println("------------------");
+        PlanNodeUtil.printPlanNode(root);
+        System.out.println("\nDistributed-Plan:");
+        System.out.println("------------------");
+        PlanNodeUtil.printPlanNode(newRoot);
+        assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
+        assertEquals(newRoot.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
+        assertEquals(newRoot.getChildren().get(0).getChildren().get(1).getChildren().size(), 2);
+    }
+
+    private Analysis constructAnalysis() {
+        Analysis analysis = new Analysis();
+        Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo = new HashMap<>();
+        List<DataRegion> d1DataRegions = new ArrayList<>();
+        d1DataRegions.add(new DataRegion(1, "192.0.0.1"));
+        d1DataRegions.add(new DataRegion(2, "192.0.0.1"));
+        Map<DataRegionTimeSlice, List<DataRegion>> d1DataRegionMap = new HashMap<>();
+        d1DataRegionMap.put(new DataRegionTimeSlice(), d1DataRegions);
+
+        List<DataRegion> d2DataRegions = new ArrayList<>();
+        d2DataRegions.add(new DataRegion(3, "192.0.0.1"));
+        Map<DataRegionTimeSlice, List<DataRegion>> d2DataRegionMap = new HashMap<>();
+        d2DataRegionMap.put(new DataRegionTimeSlice(), d2DataRegions);
+
+        dataPartitionInfo.put("root.sg.d1", d1DataRegionMap);
+        dataPartitionInfo.put("root.sg.d2", d2DataRegionMap);
+
+        analysis.setDataPartitionInfo(dataPartitionInfo);
+        return analysis;
+    }
+}