You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/28 12:43:43 UTC

[iotdb] branch xingtanzjr/polish_node updated (59c9d25 -> ec819b5)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


 discard 59c9d25  fix the bug in distribution planner
     add 27f2b96  [IOTDB-2713] Generate Analysis for query statement (#5343)
     new bf45a01  fix the bug in distribution planner
     new ec819b5  polish implement of PlanNode

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (59c9d25)
            \
             N -- N -- N   refs/heads/xingtanzjr/polish_node (ec819b5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/mpp/common/schematree/PathPatternNode.java  |  68 ++++-
 .../db/mpp/common/schematree/PathPatternTree.java  | 104 ++++++-
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |  19 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |  25 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  50 ++-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |  11 -
 .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java   |   8 -
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |  11 -
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |   2 +-
 .../db/mpp/sql/planner/plan/PlanFragment.java      |   2 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |  25 +-
 .../plan/node/metedata/read/ShowDevicesNode.java   |   9 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   |   9 +-
 .../write/CreateAlignedTimeSeriesNode.java         |   9 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |   9 +-
 .../planner/plan/node/process/AggregateNode.java   |  12 +-
 .../planner/plan/node/process/DeviceMergeNode.java |  21 +-
 .../planner/plan/node/process/ExchangeNode.java    |  14 +-
 .../sql/planner/plan/node/process/FillNode.java    |  20 +-
 .../sql/planner/plan/node/process/FilterNode.java  |  20 +-
 .../planner/plan/node/process/FilterNullNode.java  |  15 +-
 .../plan/node/process/GroupByLevelNode.java        |  11 +-
 .../sql/planner/plan/node/process/LimitNode.java   |  10 +-
 .../sql/planner/plan/node/process/OffsetNode.java  |  21 +-
 .../sql/planner/plan/node/process/SortNode.java    |  20 +-
 .../planner/plan/node/process/TimeJoinNode.java    |  10 +-
 .../sql/planner/plan/node/sink/CsvSinkNode.java    |  67 ----
 .../planner/plan/node/sink/FragmentSinkNode.java   |  15 +-
 .../sql/planner/plan/node/sink/ThriftSinkNode.java |  69 -----
 .../planner/plan/node/source/CsvSourceNode.java    |  80 -----
 .../plan/node/source/SeriesAggregateScanNode.java  |   9 +-
 .../planner/plan/node/source/SeriesScanNode.java   |   6 +-
 .../plan/node/write/InsertMultiTabletNode.java     |   9 +-
 .../sql/planner/plan/node/write/InsertRowNode.java |   9 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   9 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   9 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   9 +-
 .../sql/rewriter/ColumnPaginationController.java   | 101 ++++++
 .../db/mpp/sql/rewriter/ConcatPathRewriter.java    | 309 +++----------------
 .../db/mpp/sql/rewriter/IStatementRewriter.java    |  33 --
 .../db/mpp/sql/rewriter/WildcardsRemover.java      | 339 +++++++++++++++++----
 .../iotdb/db/mpp/sql/statement/StatementNode.java  |   2 -
 .../sql/{tree => statement}/StatementVisitor.java  |   4 +-
 .../mpp/sql/statement/component/ResultColumn.java  |   8 +-
 .../sql/statement/component/SelectComponent.java   |  17 +-
 .../db/mpp/sql/statement/crud/InsertStatement.java |   2 +-
 .../db/mpp/sql/statement/crud/QueryStatement.java  |   2 +-
 .../metadata/AlterTimeSeriesStatement.java         |   2 +-
 .../metadata/CreateAlignedTimeSeriesStatement.java |   2 +-
 .../metadata/CreateTimeSeriesStatement.java        |   2 +-
 .../apache/iotdb/db/mpp/sql/tree/Expression.java   |  21 --
 .../iotdb/db/query/expression/Expression.java      |   7 +
 .../query/expression/binary/BinaryExpression.java  |  15 +
 .../db/query/expression/unary/ConstantOperand.java |   9 +
 .../query/expression/unary/FunctionExpression.java |  26 +-
 .../query/expression/unary/LogicNotExpression.java |  13 +
 .../query/expression/unary/NegationExpression.java |  13 +
 .../query/expression/unary/TimeSeriesOperand.java  |  15 +-
 .../iotdb/db/mpp/common/PathPatternTreeTest.java   | 131 ++++++++
 59 files changed, 1116 insertions(+), 783 deletions(-)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/IStatementRewriter.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/sql/{tree => statement}/StatementVisitor.java (95%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/mpp/common/PathPatternTreeTest.java

[iotdb] 02/02: polish implement of PlanNode

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec819b53077088a526be1acdb695cd132165d7a2
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Mar 28 20:42:16 2022 +0800

    polish implement of PlanNode
---
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  2 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     | 25 ++++++-
 .../plan/node/metedata/read/ShowDevicesNode.java   |  9 +--
 .../node/metedata/write/AlterTimeSeriesNode.java   |  9 +--
 .../write/CreateAlignedTimeSeriesNode.java         |  9 +--
 .../node/metedata/write/CreateTimeSeriesNode.java  |  9 +--
 .../planner/plan/node/process/AggregateNode.java   | 12 ++--
 .../planner/plan/node/process/DeviceMergeNode.java | 21 ++++--
 .../planner/plan/node/process/ExchangeNode.java    | 14 ++--
 .../sql/planner/plan/node/process/FillNode.java    | 20 +++---
 .../sql/planner/plan/node/process/FilterNode.java  | 20 ++++--
 .../planner/plan/node/process/FilterNullNode.java  | 15 ++--
 .../plan/node/process/GroupByLevelNode.java        | 11 +--
 .../sql/planner/plan/node/process/LimitNode.java   | 10 +--
 .../sql/planner/plan/node/process/OffsetNode.java  | 21 ++++--
 .../sql/planner/plan/node/process/SortNode.java    | 20 ++++--
 .../planner/plan/node/process/TimeJoinNode.java    | 10 +--
 .../sql/planner/plan/node/sink/CsvSinkNode.java    | 67 ------------------
 .../planner/plan/node/sink/FragmentSinkNode.java   | 15 ++--
 .../sql/planner/plan/node/sink/ThriftSinkNode.java | 69 -------------------
 .../planner/plan/node/source/CsvSourceNode.java    | 80 ----------------------
 .../plan/node/source/SeriesAggregateScanNode.java  |  9 +--
 .../planner/plan/node/source/SeriesScanNode.java   |  6 +-
 .../plan/node/write/InsertMultiTabletNode.java     |  9 +--
 .../sql/planner/plan/node/write/InsertRowNode.java |  9 +--
 .../planner/plan/node/write/InsertRowsNode.java    |  9 +--
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  9 +--
 .../planner/plan/node/write/InsertTabletNode.java  |  9 +--
 28 files changed, 191 insertions(+), 337 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index ed75a0a..f2a7132 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -98,7 +98,7 @@ public class PlanFragment {
     PlanNode root = PlanNodeType.deserialize(byteBuffer);
     int childrenCount = byteBuffer.getInt();
     for (int i = 0; i < childrenCount; i++) {
-      root.addChildren(deserializeHelper(byteBuffer));
+      root.addChild(deserializeHelper(byteBuffer));
     }
     return root;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 7d99a38..ac7adee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.commons.lang.Validate;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -26,8 +28,11 @@ import static java.util.Objects.requireNonNull;
 /** The base class of query executable operators, which is used to compose logical query plan. */
 // TODO: consider how to restrict the children type for each type of ExecOperator
 public abstract class PlanNode {
+  protected static final int NO_CHILD_ALLOWED = 0;
+  protected static final int ONE_CHILD = 1;
+  protected static final int CHILD_COUNT_NO_LIMIT = -1;
 
-  private PlanNodeId id;
+  private final PlanNodeId id;
 
   protected PlanNode(PlanNodeId id) {
     requireNonNull(id, "id is null");
@@ -40,11 +45,25 @@ public abstract class PlanNode {
 
   public abstract List<PlanNode> getChildren();
 
-  public abstract void addChildren(PlanNode child);
+  public abstract void addChild(PlanNode child);
 
   public abstract PlanNode clone();
 
-  public abstract PlanNode cloneWithChildren(List<PlanNode> children);
+  public PlanNode cloneWithChildren(List<PlanNode> children) {
+    Validate.isTrue(children == null || allowedChildCount() == CHILD_COUNT_NO_LIMIT || children.size() == allowedChildCount(),
+            String.format("Child count is not correct for PlanNode. Expected: %d, Value: %d", allowedChildCount(), getChildrenCount(children)));
+    PlanNode node = clone();
+    if (children != null) {
+      children.forEach(node::addChild);
+    }
+    return node;
+  }
+
+  private int getChildrenCount(List<PlanNode> children) {
+    return children == null ? 0 : children.size();
+  }
+
+  public abstract int allowedChildCount();
 
   public abstract List<String> getOutputColumnNames();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
index 90f4023..c00130b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -36,16 +37,16 @@ public class ShowDevicesNode extends ShowNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("Clone of ShowDevicesNode is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index cf4ddaa..7e426bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement.AlterType;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -117,16 +118,16 @@ public class AlterTimeSeriesNode extends PlanNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("Clone of AlterTimeSeriesNode is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index a8252f0..c24cf8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -142,16 +143,16 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("Clone of CreateAlignedTimeSeriesNode is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 7458e98..5ba57ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -143,16 +144,16 @@ public class CreateTimeSeriesNode extends PlanNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("Clone of CreateTimeSeriesNode is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index 5a9e23b..841978b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -64,16 +66,18 @@ public class AggregateNode extends ProcessNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    throw new NotImplementedException("addChild of AggregateNode is not implemented");
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("Clone of AggregateNode is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 96dc317..68300eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -57,6 +58,12 @@ public class DeviceMergeNode extends ProcessNode {
 
   public DeviceMergeNode(PlanNodeId id) {
     super(id);
+    this.children = new ArrayList<>();
+  }
+
+  public DeviceMergeNode(PlanNodeId id, OrderBy mergeOrder) {
+    this(id);
+    this.mergeOrder = mergeOrder;
   }
 
   @Override
@@ -65,16 +72,18 @@ public class DeviceMergeNode extends ProcessNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.children.add(child);
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new DeviceMergeNode(getId(), mergeOrder);
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
   }
 
   @Override
@@ -104,4 +113,8 @@ public class DeviceMergeNode extends ProcessNode {
     this.childDeviceNodeMap.put(deviceName, childNode);
     this.children.add(childNode);
   }
+
+  public void setChildren(List<PlanNode> children) {
+    this.children = children;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 0df6d36..3e016be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -32,6 +32,8 @@ import java.util.List;
 
 public class ExchangeNode extends PlanNode {
   private PlanNode child;
+  // The remoteSourceNode is used to record the remote source info for current ExchangeNode
+  // It is not the child of current ExchangeNode
   private FragmentSinkNode remoteSourceNode;
 
   // In current version, one ExchangeNode will only have one source.
@@ -54,7 +56,9 @@ public class ExchangeNode extends PlanNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
@@ -68,12 +72,8 @@ public class ExchangeNode extends PlanNode {
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    ExchangeNode node = (ExchangeNode) clone();
-    if (children != null && children.size() > 0) {
-      node.setChild(children.get(0));
-    }
-    return node;
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
   }
 
   public void setUpstream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 99a2718..cd37d77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.commons.lang.Validate;
 import org.apache.iotdb.db.mpp.common.FillPolicy;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -40,22 +41,29 @@ public class FillNode extends ProcessNode {
     super(id);
   }
 
+  public FillNode(PlanNodeId id, FillPolicy policy) {
+    this(id);
+    this.fillPolicy = policy;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return ImmutableList.of(child);
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new FillNode(getId(), fillPolicy);
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
@@ -75,8 +83,4 @@ public class FillNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
-    this(id);
-    this.fillPolicy = fillPolicy;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 123948e..f6c7202 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -31,33 +31,39 @@ import java.util.List;
 /** The FilterNode is responsible to filter the RowRecord from TsBlock. */
 public class FilterNode extends ProcessNode {
 
-  private final PlanNode child;
+  private PlanNode child;
   // TODO we need to rename it to something like expression in order to distinguish from Operator
   // class
   private final FilterOperator predicate;
 
-  public FilterNode(PlanNodeId id, PlanNode child, FilterOperator predicate) {
+  public FilterNode(PlanNodeId id, FilterOperator predicate) {
     super(id);
-    this.child = child;
     this.predicate = predicate;
   }
 
+  public FilterNode(PlanNodeId id, PlanNode child, FilterOperator predicate) {
+    this(id, predicate);
+    this.child = child;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return ImmutableList.of(child);
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new FilterNode(getId(), predicate);
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index c51cecb..e856f2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -43,6 +43,11 @@ public class FilterNullNode extends ProcessNode {
     this.child = child;
   }
 
+  public FilterNullNode(PlanNodeId id, FilterNullPolicy policy) {
+    super(id);
+    this.discardPolicy = policy;
+  }
+
   public FilterNullNode(PlanNodeId id, PlanNode child, List<String> filterNullColumnNames) {
     super(id);
     this.child = child;
@@ -55,16 +60,18 @@ public class FilterNullNode extends ProcessNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new FilterNullNode(getId(), discardPolicy);
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index a8a3629..b48470f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -57,16 +58,18 @@ public class GroupByLevelNode extends ProcessNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    throw new NotImplementedException("addChild of GroupByLevelNode is not implemented");
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("Clone of GroupByLevelNode is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 0771572..42ad60e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -51,7 +51,9 @@ public class LimitNode extends ProcessNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
@@ -59,10 +61,8 @@ public class LimitNode extends ProcessNode {
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    LimitNode root = (LimitNode) this.clone();
-    root.setChild(children.get(0));
-    return root;
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 734c0d9..c33814a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import com.sun.scenario.effect.Offset;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -32,31 +33,37 @@ import java.util.List;
 public class OffsetNode extends ProcessNode {
 
   // The limit count
-  private final PlanNode child;
+  private PlanNode child;
   private final int offset;
 
-  public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
+  public OffsetNode(PlanNodeId id, int offset) {
     super(id);
-    this.child = child;
     this.offset = offset;
   }
 
+  public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
+    this(id, offset);
+    this.child = child;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return null;
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new OffsetNode(getId(), offset);
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index c3073d4..893a289 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -34,35 +34,41 @@ import java.util.List;
  */
 public class SortNode extends ProcessNode {
 
-  private final PlanNode child;
+  private PlanNode child;
 
   private final List<String> orderBy;
 
   private OrderBy sortOrder;
 
-  public SortNode(PlanNodeId id, PlanNode child, List<String> orderBy, OrderBy sortOrder) {
+  public SortNode(PlanNodeId id, List<String> orderBy, OrderBy sortOrder) {
     super(id);
-    this.child = child;
     this.orderBy = orderBy;
     this.sortOrder = sortOrder;
   }
 
+  public SortNode(PlanNodeId id, PlanNode child, List<String> orderBy, OrderBy sortOrder) {
+    this(id, orderBy, sortOrder);
+    this.child = child;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return ImmutableList.of(child);
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new SortNode(getId(), orderBy, sortOrder);
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index a0ec54e..d38cb58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -71,19 +71,14 @@ public class TimeJoinNode extends ProcessNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
-
-  @Override
   public PlanNode clone() {
     return new TimeJoinNode(
         PlanNodeIdAllocator.generateId(), this.mergeOrder, this.filterNullPolicy);
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    TimeJoinNode node = (TimeJoinNode) this.clone();
-    node.setChildren(children);
-    return node;
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
   }
 
   @Override
@@ -105,6 +100,7 @@ public class TimeJoinNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
+  @Override
   public void addChild(PlanNode child) {
     this.children.add(child);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
deleted file mode 100644
index 78cbc7a..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class CsvSinkNode extends SinkNode {
-  public CsvSinkNode(PlanNodeId id) {
-    super(id);
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return null;
-  }
-
-  @Override
-  public void addChildren(PlanNode child) {}
-
-  @Override
-  public PlanNode clone() {
-    return null;
-  }
-
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
-  public static CsvSinkNode deserialize(ByteBuffer byteBuffer) {
-    return null;
-  }
-
-  @Override
-  public void serialize(ByteBuffer byteBuffer) {}
-
-  @Override
-  public void close() throws Exception {}
-
-  @Override
-  public void send() {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index aeb129f..0189e82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -48,7 +48,9 @@ public class FragmentSinkNode extends SinkNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
 
   @Override
   public PlanNode clone() {
@@ -59,15 +61,8 @@ public class FragmentSinkNode extends SinkNode {
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    Validate.isTrue(
-        children == null || children.size() == 1,
-        "Children size of FragmentSinkNode should be 0 or 1");
-    FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
-    if (children != null) {
-      sinkNode.setChild(children.get(0));
-    }
-    return sinkNode;
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
deleted file mode 100644
index 173ed11..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/** not implemented in current IoTDB yet */
-public class ThriftSinkNode extends SinkNode {
-
-  public ThriftSinkNode(PlanNodeId id) {
-    super(id);
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return null;
-  }
-
-  @Override
-  public void addChildren(PlanNode child) {}
-
-  @Override
-  public PlanNode clone() {
-    return null;
-  }
-
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
-  public static ThriftSinkNode deserialize(ByteBuffer byteBuffer) {
-    return null;
-  }
-
-  @Override
-  public void serialize(ByteBuffer byteBuffer) {}
-
-  @Override
-  public void close() throws Exception {}
-
-  @Override
-  public void send() {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
deleted file mode 100644
index e28c4d1..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/** Not implemented in current version. */
-public class CsvSourceNode extends SourceNode {
-
-  public CsvSourceNode(PlanNodeId id) {
-    super(id);
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return null;
-  }
-
-  @Override
-  public void addChildren(PlanNode child) {}
-
-  @Override
-  public PlanNode clone() {
-    return null;
-  }
-
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
-  public static CsvSourceNode deserialize(ByteBuffer byteBuffer) {
-    return null;
-  }
-
-  @Override
-  public void serialize(ByteBuffer byteBuffer) {}
-
-  @Override
-  public void close() throws Exception {}
-
-  @Override
-  public void open() throws Exception {}
-
-  @Override
-  public DataRegionReplicaSet getDataRegionReplicaSet() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 0a45487d..4ee5e28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import com.google.common.collect.ImmutableList;
@@ -74,16 +75,16 @@ public class SeriesAggregateScanNode extends SourceNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("clone of SeriesAggregateScanNode is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index cd33b51..3ffaf23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -117,7 +117,7 @@ public class SeriesScanNode extends SourceNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
@@ -125,8 +125,8 @@ public class SeriesScanNode extends SourceNode {
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return this.clone();
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
index 5fea515..48bd2f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -42,16 +43,16 @@ public class InsertMultiTabletNode extends InsertNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("clone of Insert is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 585000c..e881ff1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -42,16 +43,16 @@ public class InsertRowNode extends InsertNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("clone of Insert is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index d061de6..1f3b6c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -37,16 +38,16 @@ public class InsertRowsNode extends InsertNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("clone of Insert is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 34b6e79..0c8d78f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -37,16 +38,16 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("clone of Insert is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index f9decf4..f1e4744 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
 import java.nio.ByteBuffer;
@@ -56,16 +57,16 @@ public class InsertTabletNode extends InsertNode {
   }
 
   @Override
-  public void addChildren(PlanNode child) {}
+  public void addChild(PlanNode child) {}
 
   @Override
   public PlanNode clone() {
-    return null;
+    throw new NotImplementedException("clone of Insert is not implemented");
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override

[iotdb] 01/02: fix the bug in distribution planner

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bf45a011333ca8eb602c404eaec234ac339ff3c8
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Mar 28 17:44:29 2022 +0800

    fix the bug in distribution planner
---
 .../sql/planner/plan/SimpleFragmentParallelPlanner.java   |  3 ++-
 .../mpp/sql/planner/plan/node/process/ExchangeNode.java   |  4 +++-
 .../mpp/sql/planner/plan/node/sink/FragmentSinkNode.java  | 15 +++++++++++++--
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 3603310..1be8d95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -72,7 +72,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // one by one
     int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
-    FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
 
     // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
     // of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 80bbf41..0df6d36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -60,7 +60,9 @@ public class ExchangeNode extends PlanNode {
   public PlanNode clone() {
     ExchangeNode node = new ExchangeNode(getId());
     if (remoteSourceNode != null) {
-      node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone());
+      FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone();
+      remoteSourceNodeClone.setDownStreamNode(node);
+      node.setRemoteSourceNode(remoteSourceNode);
     }
     return node;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 3ab12c7..aeb129f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.Validate;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -51,12 +52,22 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public PlanNode clone() {
-    return null;
+    FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
+    sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
+    sinkNode.setDownStreamNode(downStreamNode);
+    return sinkNode;
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+    Validate.isTrue(
+        children == null || children.size() == 1,
+        "Children size of FragmentSinkNode should be 0 or 1");
+    FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
+    if (children != null) {
+      sinkNode.setChild(children.get(0));
+    }
+    return sinkNode;
   }
 
   @Override