You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/20 08:06:34 UTC
[iotdb] branch xingtanzjr/logical_to_distributed updated: complete source rewriter for distributed plan
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/logical_to_distributed
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/logical_to_distributed by this push:
new b17c657 complete source rewriter for distributed plan
b17c657 is described below
commit b17c6579a78e450ab7ed0be3a29a977a6c0ffd6d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sun Mar 20 16:05:47 2022 +0800
complete source rewriter for distributed plan
---
.../org/apache/iotdb/db/mpp/common/Analysis.java | 4 +
.../org/apache/iotdb/db/mpp/common/DataRegion.java | 9 +++
.../iotdb/db/mpp/common/FilterNullPolicy.java | 1 +
.../sql/planner/optimization/PlanOptimizer.java | 2 +-
.../mpp/sql/planner/plan/DistributionPlanner.java | 9 ++-
.../db/mpp/sql/planner/plan/LogicalQueryPlan.java | 13 ++++
.../planner/plan/node/PlanNodeAllocator.java} | 14 ++--
.../planner/plan/node/PlanNodeUtil.java} | 28 +++----
.../sql/planner/plan/node/process/LimitNode.java | 25 ++++--
.../planner/plan/node/process/TimeJoinNode.java | 27 +++++--
.../planner/plan/node/source/SeriesScanNode.java | 10 ++-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 91 ++++++++++++++++++++++
12 files changed, 199 insertions(+), 34 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
index 08fbc45..85ff111 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
@@ -53,4 +53,8 @@ public class Analysis {
return null;
}
}
+
+ public void setDataPartitionInfo(Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo) {
+ this.dataPartitionInfo = dataPartitionInfo;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
index a46afdf..5618e1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
@@ -27,6 +27,11 @@ public class DataRegion {
private Integer dataRegionId;
private String endpoint;
+ public DataRegion(Integer dataRegionId, String endpoint) {
+ this.dataRegionId = dataRegionId;
+ this.endpoint = endpoint;
+ }
+
public int hashCode() {
return dataRegionId.hashCode();
}
@@ -37,4 +42,8 @@ public class DataRegion {
}
return false;
}
+
+ public String toString() {
+ return String.format("%s/%d",this.endpoint, this.dataRegionId);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
index 7979b27..9de26fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.common;
public enum FilterNullPolicy {
+ NO_FILTER,
CONTAINS_NULL,
ALL_NULL
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
index f98f014..ff2baae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
@@ -23,5 +23,5 @@ import org.apache.iotdb.db.mpp.common.TsBlock;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
public interface PlanOptimizer {
- PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
+ PlanNode optimize(PlanNode plan, QueryContext context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
index d9fa64c..9c55a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
@@ -37,6 +37,11 @@ public class DistributionPlanner {
this.logicalPlan = logicalPlan;
}
+ public PlanNode rewriteSource() {
+ SourceRewriter rewriter = new SourceRewriter();
+ return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext());
+ }
+
public DistributedQueryPlan planFragments() {
return null;
}
@@ -64,7 +69,7 @@ public class DistributionPlanner {
} else {
// In a general logical query plan, the children of TimeJoinNode should only be SeriesScanNode or SeriesAggregateScanNode
// So this branch should not be touched.
- root.addChild(generateDistributedPlan(child, context));
+ root.addChild(visit(child, context));
}
}
@@ -93,7 +98,7 @@ public class DistributionPlanner {
return root;
}
- public PlanNode generateDistributedPlan(PlanNode node, DistributionPlanContext context) {
+ public PlanNode visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
index 666fbf3..788746a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
@@ -28,4 +28,17 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
public class LogicalQueryPlan {
private QueryContext context;
private PlanNode rootNode;
+
+ public LogicalQueryPlan(QueryContext context, PlanNode rootNode) {
+ this.context = context;
+ this.rootNode = rootNode;
+ }
+
+ public PlanNode getRootNode() {
+ return rootNode;
+ }
+
+ public QueryContext getContext() {
+ return context;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeAllocator.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeAllocator.java
index 7979b27..edac517 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeAllocator.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,9 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
-public enum FilterNullPolicy {
- CONTAINS_NULL,
- ALL_NULL
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+public class PlanNodeAllocator {
+ public static int initialId = 0;
+ public static synchronized PlanNodeId generateId() {
+ initialId++;
+ return new PlanNodeId(String.valueOf(initialId));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index a46afdf..f174b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -17,24 +17,24 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
-/**
- * This class is used to represent the data partition info including the DataRegionId and physical node IP address
- */
-//TODO: (xingtanzjr) This class should be substituted with the class defined in Consensus level
-public class DataRegion {
- private Integer dataRegionId;
- private String endpoint;
+public class PlanNodeUtil {
+ public static void printPlanNode(PlanNode root) {
+ printPlanNodeWithLevel(root, 0);
+ }
- public int hashCode() {
- return dataRegionId.hashCode();
+ private static void printPlanNodeWithLevel(PlanNode root, int level) {
+ printTab(level);
+ System.out.println(root.toString());
+ for (PlanNode child : root.getChildren()) {
+ printPlanNodeWithLevel(child, level + 1);
+ }
}
- public boolean equals(Object obj) {
- if (obj instanceof DataRegion) {
- return this.dataRegionId.equals(((DataRegion)obj).dataRegionId);
+ private static void printTab(int count) {
+ for(int i = 0 ; i < count; i ++) {
+ System.out.print("\t");
}
- return false;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 5c5b1bc..06a5848 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -30,12 +31,16 @@ import java.util.List;
public class LimitNode extends ProcessNode {
// The limit count
- private final int limit;
- private final PlanNode child;
+ private int limit;
+ private PlanNode child;
- public LimitNode(PlanNodeId id, int limit, PlanNode child) {
+ public LimitNode(PlanNodeId id, int limit) {
super(id);
this.limit = limit;
+ }
+
+ public LimitNode(PlanNodeId id, int limit, PlanNode child) {
+ this(id, limit);
this.child = child;
}
@@ -46,12 +51,14 @@ public class LimitNode extends ProcessNode {
@Override
public PlanNode clone() {
- return null;
+ return new LimitNode(PlanNodeAllocator.generateId(), this.limit);
}
@Override
public PlanNode cloneWithChildren(List<PlanNode> children) {
- return null;
+ LimitNode root = (LimitNode) this.clone();
+ root.setChild(children.get(0));
+ return root;
}
@Override
@@ -71,4 +78,12 @@ public class LimitNode extends ProcessNode {
public PlanNode getChild() {
return child;
}
+
+ public void setChild(PlanNode child) {
+ this.child = child;
+ }
+
+ public String toString() {
+ return "LimitNode-" + this.getId();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 4886055..907c91a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
import org.apache.iotdb.db.mpp.common.OrderBy;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,13 +49,21 @@ public class TimeJoinNode extends ProcessNode {
private List<PlanNode> children;
public TimeJoinNode(
+ PlanNodeId id,
+ OrderBy mergeOrder,
+ FilterNullPolicy filterNullPolicy) {
+ super(id);
+ this.mergeOrder = mergeOrder;
+ this.filterNullPolicy = filterNullPolicy;
+ this.children = new ArrayList<>();
+ }
+
+ public TimeJoinNode(
PlanNodeId id,
OrderBy mergeOrder,
FilterNullPolicy filterNullPolicy,
List<PlanNode> children) {
- super(id);
- this.mergeOrder = mergeOrder;
- this.filterNullPolicy = filterNullPolicy;
+ this(id, mergeOrder, filterNullPolicy);
this.children = children;
}
@@ -64,12 +74,14 @@ public class TimeJoinNode extends ProcessNode {
@Override
public PlanNode clone() {
- return null;
+ return new TimeJoinNode(PlanNodeAllocator.generateId(), this.mergeOrder, this.filterNullPolicy);
}
@Override
public PlanNode cloneWithChildren(List<PlanNode> children) {
- return null;
+ TimeJoinNode node = (TimeJoinNode) this.clone();
+ node.setChildren(children);
+ return node;
}
@Override
@@ -99,4 +111,9 @@ public class TimeJoinNode extends ProcessNode {
public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
this.filterNullPolicy = filterNullPolicy;
}
+
+ public String toString() {
+ return "TimeJoinNode-" + this.getId();
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 939e2d4..f6ab1c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.DataRegion;
import org.apache.iotdb.db.mpp.common.OrderBy;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -102,12 +103,12 @@ public class SeriesScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return null;
+ return new SeriesScanNode(PlanNodeAllocator.generateId(), this.getSeriesPath());
}
@Override
public PlanNode cloneWithChildren(List<PlanNode> children) {
- return null;
+ return this.clone();
}
@Override
@@ -135,4 +136,9 @@ public class SeriesScanNode extends SourceNode {
public DataRegion getDataRegion() {
return dataRegion;
}
+
+ public String toString() {
+ return String.format("SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+ this.getId(), this.getSeriesPath(), this.getDataRegion());
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
new file mode 100644
index 0000000..3a222fa
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.DistributionPlanner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class DistributionPlannerTest {
+
+ @Test
+ public void TestRewriteSourceNode() throws IllegalPathException {
+ TimeJoinNode timeJoinNode = new TimeJoinNode(PlanNodeAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+
+ timeJoinNode.addChild(new SeriesScanNode(PlanNodeAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+ timeJoinNode.addChild(new SeriesScanNode(PlanNodeAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+ timeJoinNode.addChild(new SeriesScanNode(PlanNodeAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+
+ LimitNode root = new LimitNode(PlanNodeAllocator.generateId(), 10, timeJoinNode);
+
+ Analysis analysis = constructAnalysis();
+
+ DistributionPlanner planner = new DistributionPlanner(analysis, new LogicalQueryPlan(new QueryContext(), root));
+ PlanNode newRoot = planner.rewriteSource();
+
+ System.out.println("\nLogical-Plan:");
+ System.out.println("------------------");
+ PlanNodeUtil.printPlanNode(root);
+ System.out.println("\nDistributed-Plan:");
+ System.out.println("------------------");
+ PlanNodeUtil.printPlanNode(newRoot);
+ assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
+ assertEquals(newRoot.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
+ assertEquals(newRoot.getChildren().get(0).getChildren().get(1).getChildren().size(), 2);
+ }
+
+ private Analysis constructAnalysis() {
+ Analysis analysis = new Analysis();
+ Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo = new HashMap<>();
+ List<DataRegion> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(new DataRegion(1, "192.0.0.1"));
+ d1DataRegions.add(new DataRegion(2, "192.0.0.1"));
+ Map<DataRegionTimeSlice, List<DataRegion>> d1DataRegionMap = new HashMap<>();
+ d1DataRegionMap.put(new DataRegionTimeSlice(), d1DataRegions);
+
+ List<DataRegion> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(new DataRegion(3, "192.0.0.1"));
+ Map<DataRegionTimeSlice, List<DataRegion>> d2DataRegionMap = new HashMap<>();
+ d2DataRegionMap.put(new DataRegionTimeSlice(), d2DataRegions);
+
+ dataPartitionInfo.put("root.sg.d1", d1DataRegionMap);
+ dataPartitionInfo.put("root.sg.d2", d2DataRegionMap);
+
+ analysis.setDataPartitionInfo(dataPartitionInfo);
+ return analysis;
+ }
+}