You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/09 04:09:55 UTC

[iotdb] branch master updated: [IOTDB-2803]add AlterTimeSeriesNode and CreateAlignedTimeSeriesNode to PlanNodeType and its serialize and deserialize (#5444)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 931e309ad8 [IOTDB-2803]add AlterTimeSeriesNode and CreateAlignedTimeSeriesNode to PlanNodeType and its serialize and deserialize (#5444)
931e309ad8 is described below

commit 931e309ad8f55f4578c0c74ef241cc024177dc58
Author: Yifu Zhou <ef...@outlook.com>
AuthorDate: Sat Apr 9 12:09:51 2022 +0800

    [IOTDB-2803]add AlterTimeSeriesNode and CreateAlignedTimeSeriesNode to PlanNodeType and its serialize and deserialize (#5444)
---
 .../db/mpp/sql/planner/DistributionPlanner.java    |  26 +--
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  12 +-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |   1 -
 .../db/mpp/sql/planner/plan/PlanFragment.java      |   7 +-
 .../plan/SimpleFragmentParallelPlanner.java        |   4 +-
 .../iotdb/db/mpp/sql/planner/plan/SubPlan.java     |   3 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   4 +-
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  13 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   | 129 +++++++++++++-
 .../write/CreateAlignedTimeSeriesNode.java         | 192 ++++++++++++++++++++-
 .../planner/plan/node/process/DeviceMergeNode.java |   4 +-
 .../planner/plan/node/process/ExchangeNode.java    |   7 +-
 .../sql/planner/plan/node/process/FillNode.java    |   4 +-
 .../sql/planner/plan/node/process/FilterNode.java  |   4 +-
 .../planner/plan/node/process/FilterNullNode.java  |   4 +-
 .../plan/node/process/GroupByLevelNode.java        |   2 +-
 .../sql/planner/plan/node/process/LimitNode.java   |   6 +-
 .../sql/planner/plan/node/process/OffsetNode.java  |   4 +-
 .../sql/planner/plan/node/process/SortNode.java    |   4 +-
 .../planner/plan/node/process/TimeJoinNode.java    |   6 +-
 .../planner/plan/node/sink/FragmentSinkNode.java   |   5 +-
 .../plan/node/source/SeriesAggregateScanNode.java  |   2 +-
 .../planner/plan/node/source/SeriesScanNode.java   |   6 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   2 +-
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  65 +++++++
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  24 ++-
 26 files changed, 473 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index bb80cd0dbb..182f85fbe5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -105,7 +105,8 @@ public class DistributionPlanner {
     sinkNode.setChild(rootInstance.getFragment().getRoot());
     context
         .getResultNodeContext()
-        .setUpStream(rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getId());
+        .setUpStream(
+            rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getPlanNodeId());
     rootInstance.getFragment().setRoot(sinkNode);
   }
 
@@ -137,7 +138,7 @@ public class DistributionPlanner {
           // SeriesScanNode.
           for (RegionReplicaSet dataRegion : dataDistribution) {
             SeriesScanNode split = (SeriesScanNode) handle.clone();
-            split.setId(context.queryContext.getQueryId().genPlanNodeId());
+            split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
             split.setDataRegionReplicaSet(dataRegion);
             sources.add(split);
           }
@@ -169,7 +170,7 @@ public class DistributionPlanner {
               // We clone a TimeJoinNode from root to make the params to be consistent.
               // But we need to assign a new ID to it
               TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
-              root.setId(context.queryContext.getQueryId().genPlanNodeId());
+              root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
               seriesScanNodes.forEach(parentOfGroup::addChild);
               root.addChild(parentOfGroup);
             }
@@ -207,21 +208,22 @@ public class DistributionPlanner {
       // PlanNode, we need to process
       // them with special method
       context.putNodeDistribution(
-          node.getId(), new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, null));
+          node.getPlanNodeId(),
+          new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, null));
 
       return node.cloneWithChildren(children);
     }
 
     public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
-          node.getId(),
+          node.getPlanNodeId(),
           new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
       return node.clone();
     }
 
     public PlanNode visitSeriesAggregate(SeriesAggregateScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
-          node.getId(),
+          node.getPlanNodeId(),
           new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
       return node.clone();
     }
@@ -241,7 +243,7 @@ public class DistributionPlanner {
               ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
               : NodeDistributionType.SAME_WITH_SOME_CHILD;
       context.putNodeDistribution(
-          newNode.getId(), new NodeDistribution(distributionType, dataRegion));
+          newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion));
 
       // If the distributionType of all the children are same, no ExchangeNode need to be added.
       if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
@@ -253,7 +255,7 @@ public class DistributionPlanner {
       // parent.
       visitedChildren.forEach(
           child -> {
-            if (!dataRegion.equals(context.getNodeDistribution(child.getId()).dataRegion)) {
+            if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).dataRegion)) {
               ExchangeNode exchangeNode =
                   new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
               exchangeNode.setChild(child);
@@ -269,14 +271,14 @@ public class DistributionPlanner {
         List<PlanNode> children, NodeGroupContext context) {
       // We always make the dataRegion of TimeJoinNode to be the same as its first child.
       // TODO: (xingtanzjr) We need to implement more suitable policies here
-      return context.getNodeDistribution(children.get(0).getId()).dataRegion;
+      return context.getNodeDistribution(children.get(0).getPlanNodeId()).dataRegion;
     }
 
     private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
       // The size of children here should always be larger than 0, or our code has Bug.
-      NodeDistribution first = context.getNodeDistribution(children.get(0).getId());
+      NodeDistribution first = context.getNodeDistribution(children.get(0).getPlanNodeId());
       for (int i = 1; i < children.size(); i++) {
-        NodeDistribution next = context.getNodeDistribution(children.get(i).getId());
+        NodeDistribution next = context.getNodeDistribution(children.get(i).getPlanNodeId());
         if (first.dataRegion == null || !first.dataRegion.equals(next.dataRegion)) {
           return false;
         }
@@ -343,7 +345,7 @@ public class DistributionPlanner {
         ExchangeNode exchangeNode = (ExchangeNode) root;
         FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
         sinkNode.setChild(exchangeNode.getChild());
-        sinkNode.setDownStreamPlanNodeId(exchangeNode.getId());
+        sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());
         // Record the source node info in the ExchangeNode so that we can keep the connection of
         // these nodes/fragments
         exchangeNode.setRemoteSourceNode(sinkNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 1af476b7a2..1dbc265c0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -128,7 +128,9 @@ public class LocalExecutionPlanner {
       boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
       OperatorContext operatorContext =
           context.instanceContext.addOperatorContext(
-              context.getNextOperatorId(), node.getId(), SeriesScanOperator.class.getSimpleName());
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              SeriesScanOperator.class.getSimpleName());
 
       SeriesScanOperator seriesScanOperator =
           new SeriesScanOperator(
@@ -186,7 +188,9 @@ public class LocalExecutionPlanner {
       Operator child = node.getChild().accept(this, context);
       return new LimitOperator(
           context.instanceContext.addOperatorContext(
-              context.getNextOperatorId(), node.getId(), LimitOperator.class.getSimpleName()),
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              LimitOperator.class.getSimpleName()),
           node.getLimit(),
           child);
     }
@@ -215,7 +219,9 @@ public class LocalExecutionPlanner {
               .collect(Collectors.toList());
       OperatorContext operatorContext =
           context.instanceContext.addOperatorContext(
-              context.getNextOperatorId(), node.getId(), TimeJoinOperator.class.getSimpleName());
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              TimeJoinOperator.class.getSimpleName());
       return new TimeJoinOperator(operatorContext, children, node.getMergeOrder(), node.getTypes());
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index b3a63494ae..ae6e779fb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -286,7 +286,6 @@ public class LogicalPlanner {
           createAlignedTimeSeriesStatement.getCompressors(),
           createAlignedTimeSeriesStatement.getAliasList(),
           createAlignedTimeSeriesStatement.getTagsList(),
-          createAlignedTimeSeriesStatement.getTagOffsets(),
           createAlignedTimeSeriesStatement.getAttributesList());
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 4cf7551e13..f2c5da2b6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -81,7 +82,7 @@ public class PlanFragment {
   }
 
   private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
-    if (root.getId().equals(nodeId)) {
+    if (root.getPlanNodeId().equals(nodeId)) {
       return root;
     }
     for (PlanNode child : root.getChildren()) {
@@ -93,12 +94,12 @@ public class PlanFragment {
     return null;
   }
 
-  public static PlanFragment deserialize(ByteBuffer byteBuffer) {
+  public static PlanFragment deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
     return new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer));
   }
 
   // deserialize the plan node recursively
-  private static PlanNode deserializeHelper(ByteBuffer byteBuffer) {
+  private static PlanNode deserializeHelper(ByteBuffer byteBuffer) throws IllegalPathException {
     PlanNode root = PlanNodeType.deserialize(byteBuffer);
     int childrenCount = byteBuffer.getInt();
     for (int i = 0; i < childrenCount; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index fdedd49eda..4bdd01e9de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -107,7 +107,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
         PlanNode downStreamExchangeNode =
             downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
         ((ExchangeNode) downStreamExchangeNode)
-            .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getId());
+            .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getPlanNodeId());
       }
     }
   }
@@ -117,7 +117,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   }
 
   private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
-    planNodeMap.put(root.getId(), planFragmentId);
+    planNodeMap.put(root.getPlanNodeId(), planFragmentId);
     for (PlanNode child : root.getChildren()) {
       recordPlanNodeRelation(child, planFragmentId);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
index 1ce2f1cf1a..3ae3b9a508 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
@@ -43,7 +43,8 @@ public class SubPlan {
     StringBuilder result = new StringBuilder();
     result.append(
         String.format(
-            "SubPlan-%s. RootNodeId: %s\n", planFragment.getId(), planFragment.getRoot().getId()));
+            "SubPlan-%s. RootNodeId: %s\n",
+            planFragment.getId(), planFragment.getRoot().getPlanNodeId()));
     children.forEach(result::append);
     return result.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 518ba9a748..3fe488161c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -39,11 +39,11 @@ public abstract class PlanNode {
     this.id = id;
   }
 
-  public PlanNodeId getId() {
+  public PlanNodeId getPlanNodeId() {
     return id;
   }
 
-  public void setId(PlanNodeId id) {
+  public void setPlanNodeId(PlanNodeId id) {
     this.id = id;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 2b0df3b0aa..27f8c3d168 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,8 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
@@ -65,7 +68,9 @@ public enum PlanNodeType {
   SHOW_DEVICES((short) 18),
   CREATE_TIME_SERIES((short) 19),
   EXCHANGE((short) 20),
-  AUTHOR((short) 21);
+  AUTHOR((short) 21),
+  ALTER_TIME_SERIES((short) 22),
+  CREATE_ALIGNED_TIME_SERIES((short) 23);
 
   private final short nodeType;
 
@@ -77,7 +82,7 @@ public enum PlanNodeType {
     buffer.putShort(nodeType);
   }
 
-  public static PlanNode deserialize(ByteBuffer buffer) {
+  public static PlanNode deserialize(ByteBuffer buffer) throws IllegalPathException {
     short nodeType = buffer.getShort();
     switch (nodeType) {
       case 0:
@@ -124,6 +129,10 @@ public enum PlanNodeType {
         return ExchangeNode.deserialize(buffer);
       case 21:
         return AuthorNode.deserialize(buffer);
+      case 22:
+        return AlterTimeSeriesNode.deserialize(buffer);
+      case 23:
+        return CreateAlignedTimeSeriesNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 7e426bbcc0..492a1b892b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -19,15 +19,20 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement.AlterType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class AlterTimeSeriesNode extends PlanNode {
   private PartialPath path;
@@ -136,5 +141,127 @@ public class AlterTimeSeriesNode extends PlanNode {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  public void serialize(ByteBuffer byteBuffer) {
+    byteBuffer.putShort((short) PlanNodeType.ALTER_TIME_SERIES.ordinal());
+    ReadWriteIOUtils.write(this.getPlanNodeId().getId(), byteBuffer);
+    byte[] bytes = path.getFullPath().getBytes();
+    byteBuffer.putInt(bytes.length);
+    byteBuffer.put(bytes);
+    byteBuffer.put((byte) alterType.ordinal());
+
+    // alias
+    if (alias != null) {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(alias, byteBuffer);
+    } else {
+      byteBuffer.put((byte) 0);
+    }
+
+    // alterMap
+    if (alterMap == null) {
+      byteBuffer.put((byte) -1);
+    } else if (alterMap.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(alterMap, byteBuffer);
+    }
+
+    // tagsMap
+    if (tagsMap == null) {
+      byteBuffer.put((byte) -1);
+    } else if (tagsMap.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(tagsMap, byteBuffer);
+    }
+
+    // attributesMap
+    if (attributesMap == null) {
+      byteBuffer.put((byte) -1);
+    } else if (attributesMap.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(attributesMap, byteBuffer);
+    }
+
+    // no children node, need to set 0
+    byteBuffer.putInt(0);
+  }
+
+  public static AlterTimeSeriesNode deserialize(ByteBuffer byteBuffer) throws IllegalPathException {
+    String id;
+    PartialPath path = null;
+    AlterType alterType = null;
+    String alias = null;
+    Map<String, String> alterMap = null;
+    Map<String, String> tagsMap = null;
+    Map<String, String> attributesMap = null;
+
+    id = ReadWriteIOUtils.readString(byteBuffer);
+    int length = byteBuffer.getInt();
+    byte[] bytes = new byte[length];
+    byteBuffer.get(bytes);
+    path = new PartialPath(new String(bytes));
+    alterType = AlterType.values()[byteBuffer.get()];
+
+    // alias
+    if (byteBuffer.get() == 1) {
+      alias = ReadWriteIOUtils.readString(byteBuffer);
+    }
+
+    // alterMap
+    byte label = byteBuffer.get();
+    if (label == 0) {
+      alterMap = new HashMap<>();
+    } else if (label == 1) {
+      alterMap = ReadWriteIOUtils.readMap(byteBuffer);
+    }
+
+    // tagsMap
+    label = byteBuffer.get();
+    if (label == 0) {
+      tagsMap = new HashMap<>();
+    } else if (label == 1) {
+      tagsMap = ReadWriteIOUtils.readMap(byteBuffer);
+    }
+
+    // attributesMap
+    label = byteBuffer.get();
+    if (label == 0) {
+      attributesMap = new HashMap<>();
+    } else if (label == 1) {
+      attributesMap = ReadWriteIOUtils.readMap(byteBuffer);
+    }
+    return new AlterTimeSeriesNode(
+        new PlanNodeId(id), path, alterType, alterMap, alias, tagsMap, attributesMap);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    AlterTimeSeriesNode that = (AlterTimeSeriesNode) o;
+
+    return this.getPlanNodeId().equals(that.getPlanNodeId())
+        && Objects.equals(path, that.path)
+        && alterType == that.alterType
+        && Objects.equals(alterMap, that.alterMap)
+        && Objects.equals(alias, that.alias)
+        && Objects.equals(tagsMap, that.tagsMap)
+        && Objects.equals(attributesMap, that.attributesMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        this.getPlanNodeId(), path, alias, alterType, alterMap, attributesMap, tagsMap);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index c24cf8d40a..722c6575e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -19,17 +19,22 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class CreateAlignedTimeSeriesNode extends PlanNode {
   private PartialPath devicePath;
@@ -51,7 +56,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
       List<CompressionType> compressors,
       List<String> aliasList,
       List<Map<String, String>> tagsList,
-      List<Long> tagOffsets,
       List<Map<String, String>> attributesList) {
     super(id);
     this.devicePath = devicePath;
@@ -61,7 +65,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
     this.compressors = compressors;
     this.aliasList = aliasList;
     this.tagsList = tagsList;
-    this.tagOffsets = tagOffsets;
     this.attributesList = attributesList;
   }
 
@@ -161,5 +164,188 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  public void serialize(ByteBuffer byteBuffer) {
+    byteBuffer.putShort((short) PlanNodeType.CREATE_ALIGNED_TIME_SERIES.ordinal());
+    ReadWriteIOUtils.write(this.getPlanNodeId().getId(), byteBuffer);
+    byte[] bytes = devicePath.getFullPath().getBytes();
+    byteBuffer.putInt(bytes.length);
+    byteBuffer.put(bytes);
+
+    // measurements
+    byteBuffer.putInt(measurements.size());
+    for (String measurement : measurements) {
+      ReadWriteIOUtils.write(measurement, byteBuffer);
+    }
+
+    // dataTypes
+    for (TSDataType dataType : dataTypes) {
+      byteBuffer.put((byte) dataType.ordinal());
+    }
+
+    // encodings
+    for (TSEncoding encoding : encodings) {
+      byteBuffer.put((byte) encoding.ordinal());
+    }
+
+    // compressors
+    for (CompressionType compressor : compressors) {
+      byteBuffer.put((byte) compressor.ordinal());
+    }
+
+    // alias
+    if (aliasList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (aliasList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (String alias : aliasList) {
+        ReadWriteIOUtils.write(alias, byteBuffer);
+      }
+    }
+
+    // tags
+    if (tagsList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (tagsList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (Map<String, String> tags : tagsList) {
+        ReadWriteIOUtils.write(tags, byteBuffer);
+      }
+    }
+
+    // attributes
+    if (attributesList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (attributesList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (Map<String, String> attributes : attributesList) {
+        ReadWriteIOUtils.write(attributes, byteBuffer);
+      }
+    }
+
+    // no children node, need to set 0
+    byteBuffer.putInt(0);
+  }
+
+  public static CreateAlignedTimeSeriesNode deserialize(ByteBuffer byteBuffer)
+      throws IllegalPathException {
+    String id;
+    PartialPath devicePath = null;
+    List<String> measurements;
+    List<TSDataType> dataTypes;
+    List<TSEncoding> encodings;
+    List<CompressionType> compressors;
+    List<String> aliasList = null;
+    List<Map<String, String>> tagsList = null;
+    List<Map<String, String>> attributesList = null;
+
+    id = ReadWriteIOUtils.readString(byteBuffer);
+    int length = byteBuffer.getInt();
+    byte[] bytes = new byte[length];
+    byteBuffer.get(bytes);
+    devicePath = new PartialPath(new String(bytes));
+
+    measurements = new ArrayList<>();
+    int size = byteBuffer.getInt();
+    for (int i = 0; i < size; i++) {
+      measurements.add(ReadWriteIOUtils.readString(byteBuffer));
+    }
+
+    dataTypes = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      dataTypes.add(TSDataType.values()[byteBuffer.get()]);
+    }
+
+    encodings = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      encodings.add(TSEncoding.values()[byteBuffer.get()]);
+    }
+
+    compressors = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      compressors.add(CompressionType.values()[byteBuffer.get()]);
+    }
+
+    byte label = byteBuffer.get();
+    if (label >= 0) {
+      aliasList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          aliasList.add(ReadWriteIOUtils.readString(byteBuffer));
+        }
+      }
+    }
+
+    label = byteBuffer.get();
+    if (label >= 0) {
+      tagsList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          tagsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+        }
+      }
+    }
+
+    label = byteBuffer.get();
+    if (label >= 0) {
+      attributesList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          attributesList.add(ReadWriteIOUtils.readMap(byteBuffer));
+        }
+      }
+    }
+
+    return new CreateAlignedTimeSeriesNode(
+        new PlanNodeId(id),
+        devicePath,
+        measurements,
+        dataTypes,
+        encodings,
+        compressors,
+        aliasList,
+        tagsList,
+        attributesList);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CreateAlignedTimeSeriesNode that = (CreateAlignedTimeSeriesNode) o;
+    return this.getPlanNodeId().equals(that.getPlanNodeId())
+        && Objects.equals(devicePath, that.devicePath)
+        && Objects.equals(measurements, that.measurements)
+        && Objects.equals(dataTypes, that.dataTypes)
+        && Objects.equals(encodings, that.encodings)
+        && Objects.equals(compressors, that.compressors)
+        && Objects.equals(tagOffsets, that.tagOffsets)
+        && Objects.equals(aliasList, that.aliasList)
+        && Objects.equals(tagsList, that.tagsList)
+        && Objects.equals(attributesList, that.attributesList);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        this.getPlanNodeId(),
+        devicePath,
+        measurements,
+        dataTypes,
+        encodings,
+        compressors,
+        tagOffsets,
+        aliasList,
+        tagsList,
+        attributesList);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 7cc4b53380..c30f48e3ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -80,7 +80,7 @@ public class DeviceMergeNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new DeviceMergeNode(getId(), mergeOrder);
+    return new DeviceMergeNode(getPlanNodeId(), mergeOrder);
   }
 
   @Override
@@ -122,7 +122,7 @@ public class DeviceMergeNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[DeviceMergeNode (%s)]", this.getId());
+    String title = String.format("[DeviceMergeNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : this.getMergeOrder()));
     return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 4aa54b84da..21a3374644 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -62,10 +62,10 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   public PlanNode clone() {
-    ExchangeNode node = new ExchangeNode(getId());
+    ExchangeNode node = new ExchangeNode(getPlanNodeId());
     if (remoteSourceNode != null) {
       FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone();
-      remoteSourceNodeClone.setDownStreamPlanNodeId(node.getId());
+      remoteSourceNodeClone.setDownStreamPlanNodeId(node.getPlanNodeId());
       node.setRemoteSourceNode(remoteSourceNode);
     }
     return node;
@@ -103,7 +103,8 @@ public class ExchangeNode extends PlanNode {
   }
 
   public String toString() {
-    return String.format("ExchangeNode-%s: [SourceAddress:%s]", getId(), getSourceAddress());
+    return String.format(
+        "ExchangeNode-%s: [SourceAddress:%s]", getPlanNodeId(), getSourceAddress());
   }
 
   public String getSourceAddress() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 4dae79f4db..984fb047e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -60,7 +60,7 @@ public class FillNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new FillNode(getId(), fillPolicy);
+    return new FillNode(getPlanNodeId(), fillPolicy);
   }
 
   @Override
@@ -97,7 +97,7 @@ public class FillNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[FillNode (%s)]", this.getId());
+    String title = String.format("[FillNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("FillPolicy: " + this.getFillPolicy());
     return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 53c2cbadb1..3347eead1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -60,7 +60,7 @@ public class FilterNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new FilterNode(getId(), predicate);
+    return new FilterNode(getPlanNodeId(), predicate);
   }
 
   @Override
@@ -95,7 +95,7 @@ public class FilterNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[FilterNode (%s)]", this.getId());
+    String title = String.format("[FilterNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("QueryFilter: " + this.getPredicate());
     return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index 068d0f1249..ef1a705b24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -73,7 +73,7 @@ public class FilterNullNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new FilterNullNode(getId(), discardPolicy);
+    return new FilterNullNode(getPlanNodeId(), discardPolicy);
   }
 
   @Override
@@ -112,7 +112,7 @@ public class FilterNullNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[FilterNullNode (%s)]", this.getId());
+    String title = String.format("[FilterNullNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("FilterNullPolicy: " + this.getDiscardPolicy());
     attributes.add("FilterNullColumnNames: " + this.getFilterNullColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index b36d60b92d..a194ea05db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -115,7 +115,7 @@ public class GroupByLevelNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[GroupByLevelNode (%s)]", this.getId());
+    String title = String.format("[GroupByLevelNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("GroupByLevels: " + Arrays.toString(this.getGroupByLevels()));
     attributes.add("ColumnNames: " + this.getOutputColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 837db3e20b..b0c6a4e93f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -59,7 +59,7 @@ public class LimitNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LimitNode(getId(), this.limit);
+    return new LimitNode(getPlanNodeId(), this.limit);
   }
 
   @Override
@@ -97,12 +97,12 @@ public class LimitNode extends ProcessNode {
   }
 
   public String toString() {
-    return "LimitNode-" + this.getId();
+    return "LimitNode-" + this.getPlanNodeId();
   }
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[LimitNode (%s)]", this.getId());
+    String title = String.format("[LimitNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("RowLimit: " + this.getLimit());
     return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 294b6a60f4..f614a2cd3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -60,7 +60,7 @@ public class OffsetNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new OffsetNode(getId(), offset);
+    return new OffsetNode(getPlanNodeId(), offset);
   }
 
   @Override
@@ -95,7 +95,7 @@ public class OffsetNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[OffsetNode (%s)]", this.getId());
+    String title = String.format("[OffsetNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("RowOffset: " + this.getOffset());
     return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 0994d538d1..4744d1f938 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -66,7 +66,7 @@ public class SortNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new SortNode(getId(), orderBy, sortOrder);
+    return new SortNode(getPlanNodeId(), orderBy, sortOrder);
   }
 
   @Override
@@ -97,7 +97,7 @@ public class SortNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[SortNode (%s)]", this.getId());
+    String title = String.format("[SortNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("SortOrder: " + (this.getSortOrder() == null ? "null" : this.getSortOrder()));
     return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index efad2faa57..be36edb3e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -77,7 +77,7 @@ public class TimeJoinNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new TimeJoinNode(getId(), this.mergeOrder, this.filterNullPolicy);
+    return new TimeJoinNode(getPlanNodeId(), this.mergeOrder, this.filterNullPolicy);
   }
 
   @Override
@@ -130,7 +130,7 @@ public class TimeJoinNode extends ProcessNode {
   }
 
   public String toString() {
-    return "TimeJoinNode-" + this.getId();
+    return "TimeJoinNode-" + this.getPlanNodeId();
   }
 
   public List<TSDataType> getTypes() {
@@ -139,7 +139,7 @@ public class TimeJoinNode extends ProcessNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[TimeJoinNode (%s)]", this.getId());
+    String title = String.format("[TimeJoinNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : this.getMergeOrder()));
     attributes.add(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 6aef36cba5..d8562d144f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -47,7 +47,7 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public PlanNode clone() {
-    FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
+    FragmentSinkNode sinkNode = new FragmentSinkNode(getPlanNodeId());
     sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
     return sinkNode;
   }
@@ -100,7 +100,8 @@ public class FragmentSinkNode extends SinkNode {
   }
 
   public String toString() {
-    return String.format("FragmentSinkNode-%s:[SendTo: (%s)]", getId(), getDownStreamAddress());
+    return String.format(
+        "FragmentSinkNode-%s:[SendTo: (%s)]", getPlanNodeId(), getDownStreamAddress());
   }
 
   public String getDownStreamAddress() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 6c0ed69834..2f5d88b91b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -153,7 +153,7 @@ public class SeriesAggregateScanNode extends SourceNode {
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[SeriesAggregateScanNode (%s)]", this.getId());
+    String title = String.format("[SeriesAggregateScanNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("AggregateFunction: " + this.getExpressionString());
     return new Pair<>(title, attributes);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index b8e7a0638b..37ba66c7fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -145,7 +145,7 @@ public class SeriesScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new SeriesScanNode(getId(), getSeriesPath(), this.regionReplicaSet);
+    return new SeriesScanNode(getPlanNodeId(), getSeriesPath(), this.regionReplicaSet);
   }
 
   @Override
@@ -193,12 +193,12 @@ public class SeriesScanNode extends SourceNode {
   public String toString() {
     return String.format(
         "SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
-        this.getId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
+        this.getPlanNodeId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
   }
 
   @TestOnly
   public Pair<String, List<String>> print() {
-    String title = String.format("[SeriesScanNode (%s)]", this.getId());
+    String title = String.format("[SeriesScanNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("SeriesPath: " + this.getSeriesPath());
     attributes.add("scanOrder: " + this.getScanOrder());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index eab6460aab..f48cf01272 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -225,7 +225,7 @@ public class InsertTabletNode extends InsertNode {
       }
       InsertTabletNode subNode =
           new InsertTabletNode(
-              getId(),
+              getPlanNodeId(),
               devicePath,
               isAligned,
               measurements,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 9f20df2c9c..87bea82d7a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
@@ -44,6 +45,7 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -216,6 +218,15 @@ public class LogicalPlannerTest {
             }
           },
           createAlignedTimeSeriesNode.getTagsList());
+
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      createAlignedTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+
+      CreateAlignedTimeSeriesNode createAlignedTimeSeriesNode1 =
+          (CreateAlignedTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+      Assert.assertTrue(createAlignedTimeSeriesNode.equals(createAlignedTimeSeriesNode1));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -238,6 +249,15 @@ public class LogicalPlannerTest {
             }
           },
           alterTimeSeriesNode.getAlterMap());
+
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      alterTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+
+      AlterTimeSeriesNode alterTimeSeriesNode1 =
+          (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+      Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -258,6 +278,15 @@ public class LogicalPlannerTest {
             }
           },
           alterTimeSeriesNode.getAlterMap());
+
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      alterTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+
+      AlterTimeSeriesNode alterTimeSeriesNode1 =
+          (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+      Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -278,6 +307,15 @@ public class LogicalPlannerTest {
             }
           },
           alterTimeSeriesNode.getAlterMap());
+
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      alterTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+
+      AlterTimeSeriesNode alterTimeSeriesNode1 =
+          (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+      Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -298,6 +336,15 @@ public class LogicalPlannerTest {
             }
           },
           alterTimeSeriesNode.getAlterMap());
+
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      alterTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+
+      AlterTimeSeriesNode alterTimeSeriesNode1 =
+          (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+      Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -318,6 +365,15 @@ public class LogicalPlannerTest {
             }
           },
           alterTimeSeriesNode.getAlterMap());
+
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      alterTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+
+      AlterTimeSeriesNode alterTimeSeriesNode1 =
+          (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+      Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -347,6 +403,15 @@ public class LogicalPlannerTest {
             }
           },
           alterTimeSeriesNode.getAttributesMap());
+
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      alterTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+
+      AlterTimeSeriesNode alterTimeSeriesNode1 =
+          (AlterTimeSeriesNode) PlanNodeType.deserialize(byteBuffer);
+      Assert.assertTrue(alterTimeSeriesNode.equals(alterTimeSeriesNode1));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 51e477be94..f8a488b8fb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -143,16 +143,24 @@ public class ReadWriteIOUtils {
     buffer.putInt(map.size());
     length += 4;
     for (Entry<String, String> entry : map.entrySet()) {
-      bytes = entry.getKey().getBytes();
-      buffer.putInt(bytes.length);
+      if (entry.getKey() == null) {
+        buffer.putInt(-1);
+      } else {
+        bytes = entry.getKey().getBytes();
+        buffer.putInt(bytes.length);
+        buffer.put(bytes);
+        length += bytes.length;
+      }
       length += 4;
-      buffer.put(bytes);
-      length += bytes.length;
-      bytes = entry.getValue().getBytes();
-      buffer.putInt(bytes.length);
+      if (entry.getValue() == null) {
+        buffer.putInt(-1);
+      } else {
+        bytes = entry.getValue().getBytes();
+        buffer.putInt(bytes.length);
+        buffer.put(bytes);
+        length += bytes.length;
+      }
       length += 4;
-      buffer.put(bytes);
-      length += bytes.length;
     }
     return length;
   }