You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/14 07:32:22 UTC

[iotdb] branch master updated: [IOTDB-3067] Eliminate PhysicalPlan usage in SchemaRegion (#7573)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 66467bc30d [IOTDB-3067] Eliminate PhysicalPlan usage in SchemaRegion (#7573)
66467bc30d is described below

commit 66467bc30d7e416f4491561f49ac8236d04da80c
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Fri Oct 14 15:32:15 2022 +0800

    [IOTDB-3067] Eliminate PhysicalPlan usage in SchemaRegion (#7573)
---
 .../schemaregion/rocksdb/RSchemaRegion.java        |  32 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   2 +-
 .../apache/iotdb/db/metadata/idtable/IDTable.java  |   8 +-
 .../db/metadata/idtable/IDTableHashmapImpl.java    |  12 +-
 .../db/metadata/logfile/BufferedSerializer.java    |  54 +++
 .../db/metadata/logfile/FakeCRC32Deserializer.java |  91 +++++
 .../db/metadata/logfile/FakeCRC32Serializer.java   |  65 ++++
 .../iotdb/db/metadata/logfile/IDeserializer.java   |  41 +++
 .../iotdb/db/metadata/logfile/ISerializer.java     |  42 +++
 .../iotdb/db/metadata/logfile/MLogTxtWriter.java   |   2 +-
 .../iotdb/db/metadata/logfile/SchemaLogReader.java | 247 +++++++++++++
 .../iotdb/db/metadata/logfile/SchemaLogWriter.java | 114 ++++++
 .../plan/schemaregion/ISchemaRegionPlan.java       |  27 ++
 .../plan/schemaregion/SchemaRegionPlanType.java    |  75 ++++
 .../plan/schemaregion/SchemaRegionPlanVisitor.java |  90 +++++
 .../impl/ActivateTemplateInClusterPlanImpl.java    |  79 ++++
 .../impl/ActivateTemplatePlanImpl.java             |  44 +++
 .../impl/AutoCreateDeviceMNodePlanImpl.java        |  44 +++
 .../schemaregion/impl/ChangeAliasPlanImpl.java     |  55 +++
 .../schemaregion/impl/ChangeTagOffsetPlanImpl.java |  55 +++
 .../impl/CreateAlignedTimeSeriesPlanImpl.java      | 171 +++++++++
 .../impl/CreateTimeSeriesPlanImpl.java             | 163 +++++++++
 .../impl/DeleteTimeSeriesPlanImpl.java             |  45 +++
 .../impl/PreDeleteTimeSeriesPlanImpl.java          |  44 +++
 .../impl/RollbackPreDeleteTimeSeriesPlanImpl.java  |  44 +++
 .../impl/SchemaRegionPlanDeserializer.java         | 337 +++++++++++++++++
 .../schemaregion/impl/SchemaRegionPlanFactory.java | 140 ++++++++
 .../impl/SchemaRegionPlanSerializer.java           | 399 +++++++++++++++++++++
 .../impl/SchemaRegionPlanTxtSerializer.java        | 238 ++++++++++++
 .../schemaregion/impl/SetTemplatePlanImpl.java}    |  62 +---
 .../schemaregion/impl/UnsetTemplatePlanImpl.java}  |  62 +---
 .../write/IActivateTemplateInClusterPlan.java      |  60 ++++
 .../schemaregion/write/IActivateTemplatePlan.java  |  42 +++
 .../write/IAutoCreateDeviceMNodePlan.java          |  42 +++
 .../plan/schemaregion/write/IChangeAliasPlan.java  |  46 +++
 .../schemaregion/write/IChangeTagOffsetPlan.java   |  46 +++
 .../write/ICreateAlignedTimeSeriesPlan.java        |  80 +++++
 .../schemaregion/write/ICreateTimeSeriesPlan.java  |  79 ++++
 .../schemaregion/write/IDeleteTimeSeriesPlan.java  |  44 +++
 .../write/IPreDeleteTimeSeriesPlan.java            |  42 +++
 .../write/IRollbackPreDeleteTimeSeriesPlan.java    |  42 +++
 .../plan/schemaregion/write/ISetTemplatePlan.java  |  45 +++
 .../schemaregion/write/IUnsetTemplatePlan.java     |  45 +++
 .../db/metadata/schemaregion/ISchemaRegion.java    |  28 +-
 .../schemaregion/SchemaRegionMemoryImpl.java       | 379 ++++++++++++-------
 .../schemaregion/SchemaRegionSchemaFileImpl.java   | 330 ++++++++++-------
 .../metadata/visitor/SchemaExecutionVisitor.java   |  73 +---
 .../node/metedata/write/ActivateTemplateNode.java  |  15 +-
 .../write/CreateAlignedTimeSeriesNode.java         |  25 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |  17 +-
 .../sys/ActivateTemplateInClusterPlan.java         |  19 +-
 .../db/qp/physical/sys/ActivateTemplatePlan.java   |   8 +-
 .../qp/physical/sys/AutoCreateDeviceMNodePlan.java |   8 +-
 .../iotdb/db/qp/physical/sys/ChangeAliasPlan.java  |   3 +-
 .../db/qp/physical/sys/ChangeTagOffsetPlan.java    |   3 +-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  |  38 +-
 .../db/qp/physical/sys/CreateTimeSeriesPlan.java   |   3 +-
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |   8 +-
 .../qp/physical/sys/PreDeleteTimeSeriesPlan.java   |   3 +-
 .../sys/RollbackPreDeleteTimeSeriesPlan.java       |   4 +-
 .../iotdb/db/qp/physical/sys/SetTemplatePlan.java  |   3 +-
 .../db/qp/physical/sys/UnsetTemplatePlan.java      |   3 +-
 .../apache/iotdb/db/tools/schema/MLogParser.java   | 122 +------
 .../plan/SchemaRegionPlanCompatibilityTest.java    | 318 ++++++++++++++++
 .../org/apache/iotdb/db/tools/MLogParserTest.java  |  17 -
 65 files changed, 4308 insertions(+), 616 deletions(-)

diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index a39e42217f..f35c01a161 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -48,6 +48,13 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.mnode.MNodeType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils;
 import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.REntityMNode;
@@ -60,15 +67,8 @@ import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
@@ -239,7 +239,7 @@ public class RSchemaRegion implements ISchemaRegion {
   }
 
   @Override
-  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+  public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
     try {
       if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
         createTimeseries(
@@ -507,8 +507,8 @@ public class RSchemaRegion implements ISchemaRegion {
   }
 
   @Override
-  public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
-    PartialPath prefixPath = plan.getPrefixPath();
+  public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
+    PartialPath prefixPath = plan.getDevicePath();
     List<String> measurements = plan.getMeasurements();
     List<TSDataType> dataTypes = plan.getDataTypes();
     List<TSEncoding> encodings = plan.getEncodings();
@@ -518,7 +518,7 @@ public class RSchemaRegion implements ISchemaRegion {
         createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings);
         // update id table if not in recovering or disable id table log file
         if (config.isEnableIDTable() && !config.isEnableIDTableLogFile()) {
-          IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
+          IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getDevicePath());
           idTable.createAlignedTimeseries(plan);
         }
       } else {
@@ -853,7 +853,7 @@ public class RSchemaRegion implements ISchemaRegion {
   }
 
   @Override
-  public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
+  public void autoCreateDeviceMNode(IAutoCreateDeviceMNodePlan plan) throws MetadataException {
     throw new UnsupportedOperationException();
   }
 
@@ -2005,22 +2005,22 @@ public class RSchemaRegion implements ISchemaRegion {
   }
 
   @Override
-  public void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
+  public void setSchemaTemplate(ISetTemplatePlan plan) throws MetadataException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
+  public void unsetSchemaTemplate(IUnsetTemplatePlan plan) throws MetadataException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
+  public void setUsingSchemaTemplate(IActivateTemplatePlan plan) throws MetadataException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void activateSchemaTemplate(ActivateTemplateInClusterPlan plan, Template template)
+  public void activateSchemaTemplate(IActivateTemplateInClusterPlan plan, Template template)
       throws MetadataException {
     throw new UnsupportedOperationException();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 7e8f64c4a8..273c4a179a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -340,7 +340,7 @@ public class LocalSchemaProcessor {
    * @param plan CreateAlignedTimeSeriesPlan
    */
   public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
-    getBelongedSchemaRegionWithAutoCreate(plan.getPrefixPath()).createAlignedTimeSeries(plan);
+    getBelongedSchemaRegionWithAutoCreate(plan.getDevicePath()).createAlignedTimeSeries(plan);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
index fbe261b5d9..8723e26375 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
@@ -33,9 +33,9 @@ import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -61,7 +61,7 @@ public interface IDTable {
    * @param plan create aligned timeseries plan
    * @throws MetadataException if the device is not aligned, throw it
    */
-  void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan) throws MetadataException;
+  void createAlignedTimeseries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException;
 
   /**
    * create timeseries
@@ -69,7 +69,7 @@ public interface IDTable {
    * @param plan create timeseries plan
    * @throws MetadataException if the device is aligned, throw it
    */
-  void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException;
+  void createTimeseries(ICreateTimeSeriesPlan plan) throws MetadataException;
 
   /**
    * Delete all timeseries matching the given paths
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
index af3cb4d9bc..5880cb7c28 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
@@ -33,10 +33,10 @@ import org.apache.iotdb.db.metadata.idtable.entry.InsertMeasurementMNode;
 import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
 import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -93,13 +93,13 @@ public class IDTableHashmapImpl implements IDTable {
    * @throws MetadataException if the device is not aligned, throw it
    */
   @Override
-  public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
+  public synchronized void createAlignedTimeseries(ICreateAlignedTimeSeriesPlan plan)
       throws MetadataException {
-    DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath().toString(), true);
+    DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getDevicePath().toString(), true);
 
     for (int i = 0; i < plan.getMeasurements().size(); i++) {
       PartialPath fullPath =
-          new PartialPath(plan.getPrefixPath().toString(), plan.getMeasurements().get(i));
+          new PartialPath(plan.getDevicePath().toString(), plan.getMeasurements().get(i));
       SchemaEntry schemaEntry =
           new SchemaEntry(
               plan.getDataTypes().get(i),
@@ -120,7 +120,7 @@ public class IDTableHashmapImpl implements IDTable {
    * @throws MetadataException if the device is aligned, throw it
    */
   @Override
-  public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
+  public synchronized void createTimeseries(ICreateTimeSeriesPlan plan) throws MetadataException {
     DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevice(), false);
     SchemaEntry schemaEntry =
         new SchemaEntry(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/BufferedSerializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/BufferedSerializer.java
new file mode 100644
index 0000000000..cdfdbb19a0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/BufferedSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * This class provides the ability to buffer the data serialized by nested serializer.
+ *
+ * @param <T>
+ */
+@NotThreadSafe
+public class BufferedSerializer<T> implements ISerializer<T> {
+
+  private static final int INITIALIZED_BUFFER_SIZE = 8192;
+
+  private final ByteArrayOutputStream logBufferStream =
+      new ByteArrayOutputStream(INITIALIZED_BUFFER_SIZE);
+
+  private final ISerializer<T> serializer;
+
+  public BufferedSerializer(ISerializer<T> serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public void serialize(T t, OutputStream outputStream) throws IOException {
+    serializer.serialize(t, logBufferStream);
+    logBufferStream.writeTo(outputStream);
+    // clear buffer
+    logBufferStream.reset();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/FakeCRC32Deserializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/FakeCRC32Deserializer.java
new file mode 100644
index 0000000000..40a6f0c046
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/FakeCRC32Deserializer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This classed is used for keep mlog compatible with that of 0.14 snapshot versions. The element T
+ * will be deserialized from InputStream as format: content data length (4B) + content data (var
+ * length) + validation code (long). The validation code will be filled by a meaningless long value,
+ * and it will be read for no usage.
+ */
+@NotThreadSafe
+public class FakeCRC32Deserializer<T> implements IDeserializer<T> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FakeCRC32Deserializer.class);
+
+  private static final InputStream EMPTY_INPUT_STREAM_PLACE_HOLDER =
+      new ByteArrayInputStream(new byte[0]);
+
+  private final IDeserializer<T> deserializer;
+
+  private final ConfigurableDataInputStream dataInputStream =
+      new ConfigurableDataInputStream(EMPTY_INPUT_STREAM_PLACE_HOLDER);
+
+  public FakeCRC32Deserializer(IDeserializer<T> deserializer) {
+    this.deserializer = deserializer;
+  }
+
+  @Override
+  public T deserialize(InputStream inputStream) throws IOException {
+    dataInputStream.changeInputStream(inputStream);
+    int logLength = dataInputStream.readInt();
+    if (logLength <= 0) {
+      LOGGER.error("Read log length {} is negative.", logLength);
+      throw new IOException(
+          new IllegalArgumentException(
+              String.format("Read log length %s is negative.", logLength)));
+    }
+
+    byte[] logBuffer = new byte[logLength];
+    if (logLength < inputStream.read(logBuffer, 0, logLength)) {
+      throw new EOFException();
+    }
+
+    T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer));
+
+    // read a long to keep compatible with old version (CRC32 code)
+    dataInputStream.readLong();
+    return result;
+  }
+
+  private static class ConfigurableDataInputStream extends DataInputStream {
+
+    private ConfigurableDataInputStream(@NotNull InputStream in) {
+      super(in);
+    }
+
+    private void changeInputStream(InputStream in) {
+      this.in = in;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/FakeCRC32Serializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/FakeCRC32Serializer.java
new file mode 100644
index 0000000000..99408f6af6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/FakeCRC32Serializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This classed is used for keep mlog compatible with that of 0.14 snapshot versions. The element T
+ * will be serialized to OutputStream as format: content data length (4B) + content data (var
+ * length) + validation code (long). The validation code will be filled by a meaningless long value.
+ */
+@NotThreadSafe
+public class FakeCRC32Serializer<T> implements ISerializer<T> {
+
+  // bytes data of a long for compatibility with old version (CRC32 code)
+  private static final byte[] PLACE_HOLDER = new byte[Long.BYTES];
+
+  private static final int INITIALIZED_BUFFER_SIZE = 8192;
+
+  private final ByteArrayOutputStream logBufferStream =
+      new ByteArrayOutputStream(INITIALIZED_BUFFER_SIZE);
+  private final ByteBuffer logLengthBuffer = ByteBuffer.allocate(Integer.BYTES);
+
+  private final ISerializer<T> serializer;
+
+  public FakeCRC32Serializer(ISerializer<T> serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public void serialize(T t, OutputStream outputStream) throws IOException {
+    serializer.serialize(t, logBufferStream);
+    // write the length of plan data
+    logLengthBuffer.putInt(logBufferStream.size());
+    outputStream.write(logLengthBuffer.array());
+    // write a long to keep compatible with old version (CRC32 code)
+    logBufferStream.write(PLACE_HOLDER);
+    logBufferStream.writeTo(outputStream);
+    // clear buffer
+    logLengthBuffer.clear();
+    logBufferStream.reset();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/IDeserializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/IDeserializer.java
new file mode 100644
index 0000000000..6990790737
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/IDeserializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This interface defines the behaviour of a Deserializer of T. An instance of this interface
+ * provides the ability to deserialize an instance of T from InputStream or Bytebuffer.
+ *
+ * @param <T>
+ */
+public interface IDeserializer<T> {
+
+  default T deserialize(InputStream inputStream) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  default T deserialize(ByteBuffer buffer) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/ISerializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/ISerializer.java
new file mode 100644
index 0000000000..8c262e8dbe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/ISerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+/**
+ * This interface defines the behaviour of a Serializer of T. An instance of this interface provides
+ * the ability to serialize an instance of T to InputStream or Bytebuffer.
+ *
+ * @param <T>
+ */
+public interface ISerializer<T> {
+
+  default void serialize(T t, OutputStream outputStream) throws IOException {
+    throw new UnsupportedEncodingException();
+  }
+
+  default void serialize(T t, ByteBuffer buffer) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogTxtWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogTxtWriter.java
index 6c68bcb51e..34f35b5efa 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogTxtWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogTxtWriter.java
@@ -142,7 +142,7 @@ public class MLogTxtWriter implements AutoCloseable {
         String.format(
             "%s,%s,%s,%s,%s,%s",
             MetadataOperationType.CREATE_ALIGNED_TIMESERIES,
-            plan.getPrefixPath().getFullPath(),
+            plan.getDevicePath().getFullPath(),
             plan.getMeasurements(),
             plan.getDataTypes().stream().map(TSDataType::serialize).collect(Collectors.toList()),
             plan.getEncodings().stream().map(TSEncoding::serialize).collect(Collectors.toList()),
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/SchemaLogReader.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/SchemaLogReader.java
new file mode 100644
index 0000000000..fb79dcc170
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/SchemaLogReader.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.FileChannel;
+import java.util.NoSuchElementException;
+
+/**
+ * This class provides the common ability to read a log storing T. The corrupted file ending will be
+ * truncate during read process. If some middle part of the log file is corrupted, the read process
+ * will end and the file will be marked corrupted.
+ *
+ * @param <T>
+ */
+public class SchemaLogReader<T> implements AutoCloseable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SchemaLogReader.class);
+
+  private final File logFile;
+
+  private final RecordableInputStream inputStream;
+
+  private final IDeserializer<T> deserializer;
+
+  private T nextSchemaPlan;
+
+  private long currentIndex = 0;
+  private boolean isFileCorrupted = false;
+
+  public SchemaLogReader(String schemaDir, String logFileName, IDeserializer<T> deserializer)
+      throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    inputStream = new RecordableInputStream(new BufferedInputStream(new FileInputStream(logFile)));
+    this.deserializer = deserializer;
+  }
+
+  public SchemaLogReader(String logFilePath, IDeserializer<T> deserializer) throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    inputStream = new RecordableInputStream(new BufferedInputStream(new FileInputStream(logFile)));
+    this.deserializer = deserializer;
+  }
+
+  public boolean hasNext() {
+    if (isFileCorrupted()) {
+      return false;
+    }
+
+    if (nextSchemaPlan == null) {
+      readNext();
+      return nextSchemaPlan != null;
+    } else {
+      return true;
+    }
+  }
+
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    T result = nextSchemaPlan;
+    nextSchemaPlan = null;
+    return result;
+  }
+
+  private void readNext() {
+    currentIndex = inputStream.getReadBytes();
+    try {
+      nextSchemaPlan = deserializer.deserialize(inputStream);
+    } catch (EOFException e) {
+      // meet the end of the file, truncate the broken ending
+      nextSchemaPlan = null;
+      truncateBrokenLogs();
+    } catch (IOException e) {
+      // failed to read file
+      nextSchemaPlan = null;
+      isFileCorrupted = true;
+      LOGGER.error(
+          "File {} is corrupted. The uncorrupted size is {}.", logFile.getPath(), currentIndex, e);
+    } catch (Exception e) {
+      // error occurred when deserializing the entry
+      nextSchemaPlan = null;
+      try {
+        if (inputStream.available() > 0) {
+          // error occurred when deserializing some middle part of the file
+          isFileCorrupted = true;
+          LOGGER.error(
+              "File {} is corrupted. The uncorrupted size is {}.",
+              logFile.getPath(),
+              currentIndex,
+              e);
+        } else {
+          // the file has already been all read out, but error occurred during deserializing the
+          // last entry in file ending.
+          LOGGER.warn(e.getMessage(), e);
+          truncateBrokenLogs();
+        }
+      } catch (IOException ex) {
+        // failed to read file
+        isFileCorrupted = true;
+        LOGGER.error(
+            "File {} is corrupted. The uncorrupted size is {}.",
+            logFile.getPath(),
+            currentIndex,
+            e);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    inputStream.close();
+    currentIndex = 0;
+  }
+
+  public boolean isFileCorrupted() {
+    return isFileCorrupted;
+  }
+
+  private void truncateBrokenLogs() {
+    try (FileOutputStream outputStream = new FileOutputStream(logFile, true);
+        FileChannel channel = outputStream.getChannel()) {
+      if (currentIndex != channel.size()) {
+        LOGGER.warn(
+            "The end of log file {} is corrupted. Start truncate it. The unbroken size is {}. The file size is {}.",
+            logFile.getName(),
+            currentIndex,
+            channel.size());
+        channel.truncate(currentIndex);
+        channel.force(true);
+      }
+      isFileCorrupted = false;
+    } catch (IOException e) {
+      isFileCorrupted = true;
+      LOGGER.error("Fail to truncate log file to size {}", currentIndex, e);
+    }
+  }
+
+  /** This class provides the ability to record the num of read bytes from the nested InputStream */
+  private static class RecordableInputStream extends InputStream {
+
+    private final InputStream inputStream;
+
+    private long readBytes = 0;
+
+    private long mark;
+
+    public RecordableInputStream(InputStream inputStream) {
+      this.inputStream = inputStream;
+    }
+
+    @Override
+    public int read() throws IOException {
+      int result = inputStream.read();
+      readBytes += Byte.BYTES;
+      return result;
+    }
+
+    @Override
+    public int read(@NotNull byte[] b) throws IOException {
+      int num = inputStream.read(b);
+      if (num < 0) {
+        return num;
+      }
+      readBytes += num;
+      return num;
+    }
+
+    @Override
+    public int read(@NotNull byte[] b, int off, int len) throws IOException {
+      int num = inputStream.read(b, off, len);
+      if (num < 0) {
+        return num;
+      }
+      readBytes += num;
+      return num;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      long num = inputStream.skip(n);
+      readBytes += num;
+      return num;
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+      this.mark = readBytes;
+      inputStream.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+      return inputStream.markSupported();
+    }
+
+    @Override
+    public int available() throws IOException {
+      return inputStream.available();
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+      inputStream.reset();
+      readBytes = mark;
+    }
+
+    @Override
+    public void close() throws IOException {
+      inputStream.close();
+      readBytes = 0;
+    }
+
+    public long getReadBytes() {
+      return readBytes;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/SchemaLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/SchemaLogWriter.java
new file mode 100644
index 0000000000..44d65f74ed
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/SchemaLogWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.logfile;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+
+/**
+ * This class provides the common ability to write a log storing T.
+ *
+ * @param <T>
+ */
+public class SchemaLogWriter<T> implements AutoCloseable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SchemaLogWriter.class);
+
+  private final File logFile;
+
+  private final FileOutputStream fileOutputStream;
+
+  private final ISerializer<T> serializer;
+
+  private final boolean forceEachWrite;
+
+  private boolean hasSynced = true;
+
+  public SchemaLogWriter(
+      String schemaDir, String logFileName, ISerializer<T> serializer, boolean forceEachWrite)
+      throws IOException {
+    File dir = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!dir.exists()) {
+      if (dir.mkdirs()) {
+        LOGGER.info("create schema folder {}.", dir);
+      } else {
+        LOGGER.warn("create schema folder {} failed.", dir);
+      }
+    }
+
+    logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
+    fileOutputStream = new FileOutputStream(logFile, true);
+    this.serializer = serializer;
+
+    this.forceEachWrite = forceEachWrite;
+  }
+
+  public SchemaLogWriter(String logFilePath, ISerializer<T> serializer, boolean forceEachWrite)
+      throws IOException {
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    fileOutputStream = new FileOutputStream(logFile, true);
+    this.serializer = serializer;
+
+    this.forceEachWrite = forceEachWrite;
+  }
+
+  public synchronized void write(T schemaPlan) throws IOException {
+    hasSynced = false;
+    // serialize plan to binary data
+    serializer.serialize(schemaPlan, fileOutputStream);
+
+    if (forceEachWrite) {
+      syncBufferToDisk();
+    }
+  }
+
+  public synchronized void force() throws IOException {
+    if (hasSynced) {
+      return;
+    }
+    hasSynced = true;
+    fileOutputStream.getFD().sync();
+  }
+
+  private void syncBufferToDisk() throws IOException {
+    fileOutputStream.getFD().sync();
+    hasSynced = true;
+  }
+
+  public synchronized void clear() throws IOException {
+    fileOutputStream.close();
+
+    if (logFile != null && logFile.exists()) {
+      Files.delete(logFile.toPath());
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    fileOutputStream.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/ISchemaRegionPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/ISchemaRegionPlan.java
new file mode 100644
index 0000000000..682114c6ce
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/ISchemaRegionPlan.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion;
+
+public interface ISchemaRegionPlan {
+
+  SchemaRegionPlanType getPlanType();
+
+  <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/SchemaRegionPlanType.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/SchemaRegionPlanType.java
new file mode 100644
index 0000000000..469b867ee8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/SchemaRegionPlanType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public enum SchemaRegionPlanType {
+
+  // region These PlanType shall keep consistent with the PhysicalPlanType.ordinal to ensure
+  // compatibility
+  CREATE_TIMESERIES((byte) 4),
+  DELETE_TIMESERIES((byte) 21),
+  CHANGE_TAG_OFFSET((byte) 28),
+  CHANGE_ALIAS((byte) 29),
+  SET_TEMPLATE((byte) 38),
+  ACTIVATE_TEMPLATE((byte) 39),
+  AUTO_CREATE_DEVICE_MNODE((byte) 40),
+  CREATE_ALIGNED_TIMESERIES((byte) 41),
+  UNSET_TEMPLATE((byte) 57),
+  ACTIVATE_TEMPLATE_IN_CLUSTER((byte) 63),
+  PRE_DELETE_TIMESERIES_IN_CLUSTER((byte) 64),
+  ROLLBACK_PRE_DELETE_TIMESERIES((byte) 65);
+  // endregion
+
+  public static final int MAX_NUM = Byte.MAX_VALUE;
+  private static final SchemaRegionPlanType[] PLAN_TYPE_TABLE = new SchemaRegionPlanType[MAX_NUM];
+
+  static {
+    for (SchemaRegionPlanType type : SchemaRegionPlanType.values()) {
+      PLAN_TYPE_TABLE[type.planType] = type;
+    }
+  }
+
+  private final byte planType;
+
+  SchemaRegionPlanType(byte planType) {
+    this.planType = planType;
+  }
+
+  public byte getPlanType() {
+    return planType;
+  }
+
+  public void serialize(DataOutputStream dataOutputStream) throws IOException {
+    dataOutputStream.writeByte(planType);
+  }
+
+  public static SchemaRegionPlanType deserialize(ByteBuffer buffer) {
+    byte code = buffer.get();
+    SchemaRegionPlanType type = PLAN_TYPE_TABLE[code];
+    if (type == null) {
+      throw new IllegalArgumentException("Unrecognized SchemaRegionPlanType of " + code);
+    }
+    return type;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/SchemaRegionPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/SchemaRegionPlanVisitor.java
new file mode 100644
index 0000000000..2789493ad2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/SchemaRegionPlanVisitor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion;
+
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
+
+public abstract class SchemaRegionPlanVisitor<R, C> {
+
+  public abstract R visitSchemaRegionPlan(ISchemaRegionPlan plan, C context);
+
+  public R visitActivateTemplateInCluster(
+      IActivateTemplateInClusterPlan activateTemplateInClusterPlan, C context) {
+    return visitSchemaRegionPlan(activateTemplateInClusterPlan, context);
+  }
+
+  public R visitActivateTemplate(IActivateTemplatePlan activateTemplatePlan, C context) {
+    return visitSchemaRegionPlan(activateTemplatePlan, context);
+  }
+
+  public R visitAutoCreateDeviceMNode(
+      IAutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan, C context) {
+    return visitSchemaRegionPlan(autoCreateDeviceMNodePlan, context);
+  }
+
+  public R visitChangeAlias(IChangeAliasPlan changeAliasPlan, C context) {
+    return visitSchemaRegionPlan(changeAliasPlan, context);
+  }
+
+  public R visitChangeTagOffset(IChangeTagOffsetPlan changeTagOffsetPlan, C context) {
+    return visitSchemaRegionPlan(changeTagOffsetPlan, context);
+  }
+
+  public R visitCreateAlignedTimeSeries(
+      ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan, C context) {
+    return visitSchemaRegionPlan(createAlignedTimeSeriesPlan, context);
+  }
+
+  public R visitCreateTimeSeries(ICreateTimeSeriesPlan createTimeSeriesPlan, C context) {
+    return visitSchemaRegionPlan(createTimeSeriesPlan, context);
+  }
+
+  public R visitDeleteTimeSeries(IDeleteTimeSeriesPlan deleteTimeSeriesPlan, C context) {
+    return visitSchemaRegionPlan(deleteTimeSeriesPlan, context);
+  }
+
+  public R visitPreDeleteTimeSeries(IPreDeleteTimeSeriesPlan preDeleteTimeSeriesPlan, C context) {
+    return visitSchemaRegionPlan(preDeleteTimeSeriesPlan, context);
+  }
+
+  public R visitRollbackPreDeleteTimeSeries(
+      IRollbackPreDeleteTimeSeriesPlan rollbackPreDeleteTimeSeriesPlan, C context) {
+    return visitSchemaRegionPlan(rollbackPreDeleteTimeSeriesPlan, context);
+  }
+
+  public R visitSetTemplate(ISetTemplatePlan setTemplatePlan, C context) {
+    return visitSchemaRegionPlan(setTemplatePlan, context);
+  }
+
+  public R visitUnsetTemplate(IUnsetTemplatePlan unsetTemplatePlan, C context) {
+    return visitSchemaRegionPlan(unsetTemplatePlan, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ActivateTemplateInClusterPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ActivateTemplateInClusterPlanImpl.java
new file mode 100644
index 0000000000..39070aa709
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ActivateTemplateInClusterPlanImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+
+public class ActivateTemplateInClusterPlanImpl implements IActivateTemplateInClusterPlan {
+
+  private PartialPath activatePath;
+  private int templateSetLevel;
+  private int templateId;
+  private boolean isAligned;
+
+  public ActivateTemplateInClusterPlanImpl() {}
+
+  public ActivateTemplateInClusterPlanImpl(
+      PartialPath activatePath, int templateSetLevel, int templateId) {
+    this.activatePath = activatePath;
+    this.templateSetLevel = templateSetLevel;
+    this.templateId = templateId;
+  }
+
+  public PartialPath getActivatePath() {
+    return activatePath;
+  }
+
+  @Override
+  public void setActivatePath(PartialPath activatePath) {
+    this.activatePath = activatePath;
+  }
+
+  @Override
+  public int getTemplateId() {
+    return templateId;
+  }
+
+  @Override
+  public void setTemplateId(int templateSetLevel) {
+    this.templateSetLevel = templateSetLevel;
+  }
+
+  @Override
+  public int getTemplateSetLevel() {
+    return templateSetLevel;
+  }
+
+  @Override
+  public void setTemplateSetLevel(int templateId) {
+    this.templateId = templateId;
+  }
+
+  @Override
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  @Override
+  public void setAligned(boolean aligned) {
+    isAligned = aligned;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ActivateTemplatePlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ActivateTemplatePlanImpl.java
new file mode 100644
index 0000000000..885e52d4bc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ActivateTemplatePlanImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+
+public class ActivateTemplatePlanImpl implements IActivateTemplatePlan {
+
+  PartialPath prefixPath;
+
+  public ActivateTemplatePlanImpl() {}
+
+  public ActivateTemplatePlanImpl(PartialPath prefixPath) {
+    this.prefixPath = prefixPath;
+  }
+
+  @Override
+  public PartialPath getPrefixPath() {
+    return prefixPath;
+  }
+
+  @Override
+  public void setPrefixPath(PartialPath prefixPath) {
+    this.prefixPath = prefixPath;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/AutoCreateDeviceMNodePlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/AutoCreateDeviceMNodePlanImpl.java
new file mode 100644
index 0000000000..2f1edf2598
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/AutoCreateDeviceMNodePlanImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+
+public class AutoCreateDeviceMNodePlanImpl implements IAutoCreateDeviceMNodePlan {
+
+  private PartialPath path;
+
+  public AutoCreateDeviceMNodePlanImpl() {}
+
+  public AutoCreateDeviceMNodePlanImpl(PartialPath path) {
+    this.path = path;
+  }
+
+  @Override
+  public PartialPath getPath() {
+    return path;
+  }
+
+  @Override
+  public void setPath(PartialPath path) {
+    this.path = path;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ChangeAliasPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ChangeAliasPlanImpl.java
new file mode 100644
index 0000000000..efc3378cd6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ChangeAliasPlanImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+
+public class ChangeAliasPlanImpl implements IChangeAliasPlan {
+  private PartialPath path;
+  private String alias;
+
+  public ChangeAliasPlanImpl() {}
+
+  public ChangeAliasPlanImpl(PartialPath path, String alias) {
+    this.path = path;
+    this.alias = alias;
+  }
+
+  @Override
+  public PartialPath getPath() {
+    return path;
+  }
+
+  @Override
+  public void setPath(PartialPath path) {
+    this.path = path;
+  }
+
+  @Override
+  public String getAlias() {
+    return alias;
+  }
+
+  @Override
+  public void setAlias(String alias) {
+    this.alias = alias;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ChangeTagOffsetPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ChangeTagOffsetPlanImpl.java
new file mode 100644
index 0000000000..fa3d8345eb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/ChangeTagOffsetPlanImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+
+public class ChangeTagOffsetPlanImpl implements IChangeTagOffsetPlan {
+  private PartialPath path;
+  private long offset;
+
+  public ChangeTagOffsetPlanImpl() {}
+
+  public ChangeTagOffsetPlanImpl(PartialPath partialPath, long offset) {
+    path = partialPath;
+    this.offset = offset;
+  }
+
+  @Override
+  public PartialPath getPath() {
+    return path;
+  }
+
+  @Override
+  public void setPath(PartialPath path) {
+    this.path = path;
+  }
+
+  @Override
+  public long getOffset() {
+    return offset;
+  }
+
+  @Override
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/CreateAlignedTimeSeriesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/CreateAlignedTimeSeriesPlanImpl.java
new file mode 100644
index 0000000000..e2119f19ed
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/CreateAlignedTimeSeriesPlanImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class CreateAlignedTimeSeriesPlanImpl implements ICreateAlignedTimeSeriesPlan {
+
+  private PartialPath devicePath;
+  private List<String> measurements;
+  private List<TSDataType> dataTypes;
+  private List<TSEncoding> encodings;
+  private List<CompressionType> compressors;
+  private List<String> aliasList;
+  private List<Map<String, String>> tagsList;
+  private List<Map<String, String>> attributesList;
+  private List<Long> tagOffsets = null;
+
+  public CreateAlignedTimeSeriesPlanImpl() {}
+
+  public CreateAlignedTimeSeriesPlanImpl(
+      PartialPath devicePath,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<String> aliasList,
+      List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList) {
+    this.devicePath = devicePath;
+    this.measurements = measurements;
+    this.dataTypes = dataTypes;
+    this.encodings = encodings;
+    this.compressors = compressors;
+    this.aliasList = aliasList;
+    this.tagsList = tagsList;
+    this.attributesList = attributesList;
+  }
+
+  public CreateAlignedTimeSeriesPlanImpl(
+      PartialPath devicePath, String measurement, MeasurementSchema schema) {
+    this.devicePath = devicePath;
+    this.measurements = Collections.singletonList(measurement);
+    this.dataTypes = Collections.singletonList(schema.getType());
+    this.encodings = Collections.singletonList(schema.getEncodingType());
+    this.compressors = Collections.singletonList(schema.getCompressor());
+  }
+
+  @Override
+  public PartialPath getDevicePath() {
+    return devicePath;
+  }
+
+  @Override
+  public void setDevicePath(PartialPath devicePath) {
+    this.devicePath = devicePath;
+  }
+
+  @Override
+  public List<String> getMeasurements() {
+    return measurements;
+  }
+
+  @Override
+  public void setMeasurements(List<String> measurements) {
+    this.measurements = measurements;
+  }
+
+  @Override
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  @Override
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  @Override
+  public List<TSEncoding> getEncodings() {
+    return encodings;
+  }
+
+  @Override
+  public void setEncodings(List<TSEncoding> encodings) {
+    this.encodings = encodings;
+  }
+
+  @Override
+  public List<CompressionType> getCompressors() {
+    return compressors;
+  }
+
+  @Override
+  public void setCompressors(List<CompressionType> compressors) {
+    this.compressors = compressors;
+  }
+
+  @Override
+  public List<String> getAliasList() {
+    return aliasList;
+  }
+
+  @Override
+  public void setAliasList(List<String> aliasList) {
+    this.aliasList = aliasList;
+  }
+
+  @Override
+  public List<Map<String, String>> getTagsList() {
+    return tagsList;
+  }
+
+  @Override
+  public void setTagsList(List<Map<String, String>> tagsList) {
+    this.tagsList = tagsList;
+  }
+
+  @Override
+  public List<Map<String, String>> getAttributesList() {
+    return attributesList;
+  }
+
+  @Override
+  public void setAttributesList(List<Map<String, String>> attributesList) {
+    this.attributesList = attributesList;
+  }
+
+  @Override
+  public List<Long> getTagOffsets() {
+    if (tagOffsets == null) {
+      tagOffsets = new ArrayList<>();
+      for (int i = 0; i < measurements.size(); i++) {
+        tagOffsets.add(Long.parseLong("-1"));
+      }
+    }
+    return tagOffsets;
+  }
+
+  @Override
+  public void setTagOffsets(List<Long> tagOffsets) {
+    this.tagOffsets = tagOffsets;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/CreateTimeSeriesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/CreateTimeSeriesPlanImpl.java
new file mode 100644
index 0000000000..ed349d86ed
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/CreateTimeSeriesPlanImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan {
+
+  private PartialPath path;
+  private TSDataType dataType;
+  private TSEncoding encoding;
+  private CompressionType compressor;
+  private String alias;
+  private Map<String, String> props = null;
+  private Map<String, String> tags = null;
+  private Map<String, String> attributes = null;
+  private long tagOffset = -1;
+
+  public CreateTimeSeriesPlanImpl() {}
+
+  public CreateTimeSeriesPlanImpl(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props,
+      Map<String, String> tags,
+      Map<String, String> attributes,
+      String alias) {
+    this.path = path;
+    this.dataType = dataType;
+    this.encoding = encoding;
+    this.compressor = compressor;
+    this.tags = tags;
+    this.attributes = attributes;
+    this.alias = alias;
+    if (props != null) {
+      this.props = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+      this.props.putAll(props);
+    }
+  }
+
+  public CreateTimeSeriesPlanImpl(PartialPath path, MeasurementSchema schema) {
+    this.path = path;
+    this.dataType = schema.getType();
+    this.encoding = schema.getEncodingType();
+    this.compressor = schema.getCompressor();
+  }
+
+  @Override
+  public PartialPath getPath() {
+    return path;
+  }
+
+  @Override
+  public void setPath(PartialPath path) {
+    this.path = path;
+  }
+
+  @Override
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  @Override
+  public void setDataType(TSDataType dataType) {
+    this.dataType = dataType;
+  }
+
+  @Override
+  public CompressionType getCompressor() {
+    return compressor;
+  }
+
+  @Override
+  public void setCompressor(CompressionType compressor) {
+    this.compressor = compressor;
+  }
+
+  @Override
+  public TSEncoding getEncoding() {
+    return encoding;
+  }
+
+  @Override
+  public void setEncoding(TSEncoding encoding) {
+    this.encoding = encoding;
+  }
+
+  @Override
+  public Map<String, String> getAttributes() {
+    return attributes;
+  }
+
+  @Override
+  public void setAttributes(Map<String, String> attributes) {
+    this.attributes = attributes;
+  }
+
+  @Override
+  public String getAlias() {
+    return alias;
+  }
+
+  @Override
+  public void setAlias(String alias) {
+    this.alias = alias;
+  }
+
+  @Override
+  public Map<String, String> getTags() {
+    return tags;
+  }
+
+  @Override
+  public void setTags(Map<String, String> tags) {
+    this.tags = tags;
+  }
+
+  @Override
+  public Map<String, String> getProps() {
+    return props;
+  }
+
+  @Override
+  public void setProps(Map<String, String> props) {
+    this.props = props;
+  }
+
+  @Override
+  public long getTagOffset() {
+    return tagOffset;
+  }
+
+  @Override
+  public void setTagOffset(long tagOffset) {
+    this.tagOffset = tagOffset;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/DeleteTimeSeriesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/DeleteTimeSeriesPlanImpl.java
new file mode 100644
index 0000000000..92682e4c36
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/DeleteTimeSeriesPlanImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+
+import java.util.List;
+
+public class DeleteTimeSeriesPlanImpl implements IDeleteTimeSeriesPlan {
+
+  private List<PartialPath> deletePathList;
+
+  public DeleteTimeSeriesPlanImpl() {}
+
+  public DeleteTimeSeriesPlanImpl(List<PartialPath> deletePathList) {
+    this.deletePathList = deletePathList;
+  }
+
+  @Override
+  public List<PartialPath> getDeletePathList() {
+    return deletePathList;
+  }
+
+  @Override
+  public void setDeletePathList(List<PartialPath> deletePathList) {
+    this.deletePathList = deletePathList;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/PreDeleteTimeSeriesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/PreDeleteTimeSeriesPlanImpl.java
new file mode 100644
index 0000000000..47f44a7740
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/PreDeleteTimeSeriesPlanImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+
+public class PreDeleteTimeSeriesPlanImpl implements IPreDeleteTimeSeriesPlan {
+
+  private PartialPath path;
+
+  public PreDeleteTimeSeriesPlanImpl() {}
+
+  public PreDeleteTimeSeriesPlanImpl(PartialPath path) {
+    this.path = path;
+  }
+
+  @Override
+  public PartialPath getPath() {
+    return path;
+  }
+
+  @Override
+  public void setPath(PartialPath path) {
+    this.path = path;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/RollbackPreDeleteTimeSeriesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/RollbackPreDeleteTimeSeriesPlanImpl.java
new file mode 100644
index 0000000000..8f21e9fe8a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/RollbackPreDeleteTimeSeriesPlanImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+
+public class RollbackPreDeleteTimeSeriesPlanImpl implements IRollbackPreDeleteTimeSeriesPlan {
+
+  private PartialPath path;
+
+  public RollbackPreDeleteTimeSeriesPlanImpl() {}
+
+  public RollbackPreDeleteTimeSeriesPlanImpl(PartialPath path) {
+    this.path = path;
+  }
+
+  @Override
+  public PartialPath getPath() {
+    return path;
+  }
+
+  @Override
+  public void setPath(PartialPath path) {
+    this.path = path;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanDeserializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanDeserializer.java
new file mode 100644
index 0000000000..f77568ad09
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanDeserializer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.metadata.logfile.IDeserializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class provides the ability to deserialize a SchemaRegionPlan from binary data in given
+ * ByteBuffer, the format of which is compatible with that of PhysicalPlan in 0.14 snapshot
+ * versions.
+ */
+public class SchemaRegionPlanDeserializer implements IDeserializer<ISchemaRegionPlan> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegionPlanDeserializer.class);
+
+  @Override
+  public ISchemaRegionPlan deserialize(ByteBuffer byteBuffer) {
+    ISchemaRegionPlan schemaRegionPlan =
+        SchemaRegionPlanFactory.getEmptyPlan(SchemaRegionPlanType.deserialize(byteBuffer));
+    return schemaRegionPlan.accept(new SchemaRegionPlanDeserializeVisitor(), byteBuffer);
+  }
+
+  private static class SchemaRegionPlanDeserializeVisitor
+      extends SchemaRegionPlanVisitor<ISchemaRegionPlan, ByteBuffer> {
+
+    @Override
+    public ISchemaRegionPlan visitSchemaRegionPlan(ISchemaRegionPlan plan, ByteBuffer byteBuffer) {
+      throw new UnsupportedOperationException(
+          String.format("%s plan doesn't support deserialization.", plan.getPlanType().name()));
+    }
+
+    @Override
+    public ISchemaRegionPlan visitActivateTemplateInCluster(
+        IActivateTemplateInClusterPlan activateTemplateInClusterPlan, ByteBuffer buffer) {
+      try {
+        activateTemplateInClusterPlan.setActivatePath(
+            new PartialPath(ReadWriteIOUtils.readString(buffer)));
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+      activateTemplateInClusterPlan.setTemplateSetLevel(ReadWriteIOUtils.readInt(buffer));
+      activateTemplateInClusterPlan.setTemplateId(ReadWriteIOUtils.readInt(buffer));
+      activateTemplateInClusterPlan.setAligned(ReadWriteIOUtils.readBool(buffer));
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+      return activateTemplateInClusterPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitActivateTemplate(
+        IActivateTemplatePlan activateTemplatePlan, ByteBuffer buffer) {
+      try {
+        activateTemplatePlan.setPrefixPath(new PartialPath(ReadWriteIOUtils.readString(buffer)));
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+      return activateTemplatePlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitAutoCreateDeviceMNode(
+        IAutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan, ByteBuffer buffer) {
+      try {
+        autoCreateDeviceMNodePlan.setPath(new PartialPath(ReadWriteIOUtils.readString(buffer)));
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+      return autoCreateDeviceMNodePlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitChangeAlias(IChangeAliasPlan changeAliasPlan, ByteBuffer buffer) {
+      try {
+        changeAliasPlan.setPath(new PartialPath(ReadWriteIOUtils.readString(buffer)));
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+      changeAliasPlan.setAlias(ReadWriteIOUtils.readString(buffer));
+      return changeAliasPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitChangeTagOffset(
+        IChangeTagOffsetPlan changeTagOffsetPlan, ByteBuffer buffer) {
+      try {
+        changeTagOffsetPlan.setPath(new PartialPath(ReadWriteIOUtils.readString(buffer)));
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+      changeTagOffsetPlan.setOffset(buffer.getLong());
+      return changeTagOffsetPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitCreateAlignedTimeSeries(
+        ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan, ByteBuffer buffer) {
+      // deserialize a version mark to adapt to old version
+      buffer.getInt();
+
+      int length = buffer.getInt();
+      byte[] bytes = new byte[length];
+      buffer.get(bytes);
+      try {
+        createAlignedTimeSeriesPlan.setDevicePath(new PartialPath(new String(bytes)));
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+
+      int size = ReadWriteIOUtils.readInt(buffer);
+      List<String> measurements = new ArrayList<>();
+      for (int i = 0; i < size; i++) {
+        measurements.add(ReadWriteIOUtils.readString(buffer));
+      }
+      createAlignedTimeSeriesPlan.setMeasurements(measurements);
+
+      List<TSDataType> dataTypes = new ArrayList<>();
+      for (int i = 0; i < size; i++) {
+        dataTypes.add(TSDataType.values()[buffer.get()]);
+      }
+      createAlignedTimeSeriesPlan.setDataTypes(dataTypes);
+
+      List<TSEncoding> encodings = new ArrayList<>();
+      for (int i = 0; i < size; i++) {
+        encodings.add(TSEncoding.values()[buffer.get()]);
+      }
+      createAlignedTimeSeriesPlan.setEncodings(encodings);
+
+      List<CompressionType> compressors = new ArrayList<>();
+      for (int i = 0; i < size; i++) {
+        compressors.add(CompressionType.values()[buffer.get()]);
+      }
+      createAlignedTimeSeriesPlan.setCompressors(compressors);
+
+      List<Long> tagOffsets = new ArrayList<>();
+      for (int i = 0; i < size; i++) {
+        tagOffsets.add(buffer.getLong());
+      }
+      createAlignedTimeSeriesPlan.setTagOffsets(tagOffsets);
+
+      // alias
+      if (buffer.get() == 1) {
+        List<String> aliasList = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+          aliasList.add(ReadWriteIOUtils.readString(buffer));
+        }
+        createAlignedTimeSeriesPlan.setAliasList(aliasList);
+      }
+
+      // tags
+      if (buffer.get() == 1) {
+        List<Map<String, String>> tagsList = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+          tagsList.add(ReadWriteIOUtils.readMap(buffer));
+        }
+        createAlignedTimeSeriesPlan.setTagsList(tagsList);
+      }
+
+      // attributes
+      if (buffer.get() == 1) {
+        List<Map<String, String>> attributesList = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+          attributesList.add(ReadWriteIOUtils.readMap(buffer));
+        }
+        createAlignedTimeSeriesPlan.setAttributesList(attributesList);
+      }
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+
+      return createAlignedTimeSeriesPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitCreateTimeSeries(
+        ICreateTimeSeriesPlan createTimeSeriesPlan, ByteBuffer buffer) {
+      int length = buffer.getInt();
+      byte[] bytes = new byte[length];
+      buffer.get(bytes);
+      try {
+        createTimeSeriesPlan.setPath(new PartialPath(new String(bytes)));
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+
+      createTimeSeriesPlan.setDataType(TSDataType.values()[buffer.get()]);
+      createTimeSeriesPlan.setEncoding(TSEncoding.values()[buffer.get()]);
+      createTimeSeriesPlan.setCompressor(CompressionType.values()[buffer.get()]);
+      createTimeSeriesPlan.setTagOffset(buffer.getLong());
+
+      // alias
+      if (buffer.get() == 1) {
+        createTimeSeriesPlan.setAlias(ReadWriteIOUtils.readString(buffer));
+      }
+
+      // props
+      if (buffer.get() == 1) {
+        createTimeSeriesPlan.setProps(ReadWriteIOUtils.readMap(buffer));
+      }
+
+      // tags
+      if (buffer.get() == 1) {
+        createTimeSeriesPlan.setTags(ReadWriteIOUtils.readMap(buffer));
+      }
+
+      // attributes
+      if (buffer.get() == 1) {
+        createTimeSeriesPlan.setAttributes(ReadWriteIOUtils.readMap(buffer));
+      }
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+
+      return createTimeSeriesPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitDeleteTimeSeries(
+        IDeleteTimeSeriesPlan deleteTimeSeriesPlan, ByteBuffer buffer) {
+      int pathNumber = buffer.getInt();
+
+      try {
+        List<PartialPath> deletePathList = new ArrayList<>();
+        for (int i = 0; i < pathNumber; i++) {
+          deletePathList.add(new PartialPath(ReadWriteIOUtils.readString(buffer)));
+        }
+        deleteTimeSeriesPlan.setDeletePathList(deletePathList);
+      } catch (IllegalPathException e) {
+        LOGGER.error("Cannot deserialize SchemaRegionPlan from buffer", e);
+      }
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+
+      return deleteTimeSeriesPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitPreDeleteTimeSeries(
+        IPreDeleteTimeSeriesPlan preDeleteTimeSeriesPlan, ByteBuffer buffer) {
+      preDeleteTimeSeriesPlan.setPath((PartialPath) PathDeserializeUtil.deserialize(buffer));
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+
+      return preDeleteTimeSeriesPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitRollbackPreDeleteTimeSeries(
+        IRollbackPreDeleteTimeSeriesPlan rollbackPreDeleteTimeSeriesPlan, ByteBuffer buffer) {
+      rollbackPreDeleteTimeSeriesPlan.setPath(
+          (PartialPath) PathDeserializeUtil.deserialize(buffer));
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+
+      return rollbackPreDeleteTimeSeriesPlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitSetTemplate(ISetTemplatePlan setTemplatePlan, ByteBuffer buffer) {
+      setTemplatePlan.setTemplateName(ReadWriteIOUtils.readString(buffer));
+      setTemplatePlan.setPrefixPath(ReadWriteIOUtils.readString(buffer));
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+
+      return setTemplatePlan;
+    }
+
+    @Override
+    public ISchemaRegionPlan visitUnsetTemplate(
+        IUnsetTemplatePlan unsetTemplatePlan, ByteBuffer buffer) {
+      unsetTemplatePlan.setPrefixPath(ReadWriteIOUtils.readString(buffer));
+      unsetTemplatePlan.setTemplateName(ReadWriteIOUtils.readString(buffer));
+
+      // deserialize a long to keep compatible with old version (raft index)
+      buffer.getLong();
+
+      return unsetTemplatePlan;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanFactory.java
new file mode 100644
index 0000000000..7a2ccb0e84
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.List;
+import java.util.Map;
+
+public class SchemaRegionPlanFactory {
+
+  private SchemaRegionPlanFactory() {}
+
+  public static ISchemaRegionPlan getEmptyPlan(SchemaRegionPlanType planType) {
+    switch (planType) {
+      case CREATE_TIMESERIES:
+        return new CreateTimeSeriesPlanImpl();
+      case DELETE_TIMESERIES:
+        return new DeleteTimeSeriesPlanImpl();
+      case CHANGE_TAG_OFFSET:
+        return new ChangeTagOffsetPlanImpl();
+      case CHANGE_ALIAS:
+        return new ChangeAliasPlanImpl();
+      case SET_TEMPLATE:
+        return new SetTemplatePlanImpl();
+      case ACTIVATE_TEMPLATE:
+        return new ActivateTemplatePlanImpl();
+      case AUTO_CREATE_DEVICE_MNODE:
+        return new AutoCreateDeviceMNodePlanImpl();
+      case CREATE_ALIGNED_TIMESERIES:
+        return new CreateAlignedTimeSeriesPlanImpl();
+      case UNSET_TEMPLATE:
+        return new UnsetTemplatePlanImpl();
+      case ACTIVATE_TEMPLATE_IN_CLUSTER:
+        return new ActivateTemplateInClusterPlanImpl();
+      case PRE_DELETE_TIMESERIES_IN_CLUSTER:
+        return new PreDeleteTimeSeriesPlanImpl();
+      case ROLLBACK_PRE_DELETE_TIMESERIES:
+        return new RollbackPreDeleteTimeSeriesPlanImpl();
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "SchemaRegionPlan of type %s doesn't support creating empty plan.",
+                planType.name()));
+    }
+  }
+
+  public static IChangeAliasPlan getChangeAliasPlan(PartialPath path, String alias) {
+    return new ChangeAliasPlanImpl(path, alias);
+  }
+
+  public static IChangeTagOffsetPlan getChangeTagOffsetPlan(PartialPath fullPath, long tagOffset) {
+    return new ChangeTagOffsetPlanImpl(fullPath, tagOffset);
+  }
+
+  public static IActivateTemplatePlan getActivateTemplatePlan(PartialPath path) {
+    return new ActivateTemplatePlanImpl(path);
+  }
+
+  public static IAutoCreateDeviceMNodePlan getAutoCreateDeviceMNodePlan(PartialPath path) {
+    return new AutoCreateDeviceMNodePlanImpl(path);
+  }
+
+  public static ICreateTimeSeriesPlan getCreateTimeSeriesPlan(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props,
+      Map<String, String> tags,
+      Map<String, String> attributes,
+      String alias) {
+    return new CreateTimeSeriesPlanImpl(
+        path, dataType, encoding, compressor, props, tags, attributes, alias);
+  }
+
+  public static ICreateAlignedTimeSeriesPlan getCreateAlignedTimeSeriesPlan(
+      PartialPath prefixPath,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<String> aliasList,
+      List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList) {
+    return new CreateAlignedTimeSeriesPlanImpl(
+        prefixPath,
+        measurements,
+        dataTypes,
+        encodings,
+        compressors,
+        aliasList,
+        tagsList,
+        attributesList);
+  }
+
+  public static IDeleteTimeSeriesPlan getDeleteTimeSeriesPlan(List<PartialPath> pathList) {
+    return new DeleteTimeSeriesPlanImpl(pathList);
+  }
+
+  public static IPreDeleteTimeSeriesPlan getPreDeleteTimeSeriesPlan(PartialPath path) {
+    return new PreDeleteTimeSeriesPlanImpl(path);
+  }
+
+  public static IRollbackPreDeleteTimeSeriesPlan getRollbackPreDeleteTimeSeriesPlan(
+      PartialPath path) {
+    return new RollbackPreDeleteTimeSeriesPlanImpl(path);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanSerializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanSerializer.java
new file mode 100644
index 0000000000..9bed473b8f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanSerializer.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.logfile.ISerializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class provides the ability to serialize the given SchemaRegionPlan into binary data, which
+ * is compatible with that of PhysicalPlan in 0.14 snapshot versions.
+ */
+@NotThreadSafe
+public class SchemaRegionPlanSerializer implements ISerializer<ISchemaRegionPlan> {
+
+  public final ConfigurableDataOutputStream dataOutputStream =
+      new ConfigurableDataOutputStream(null);
+
+  @Override
+  public void serialize(ISchemaRegionPlan plan, OutputStream outputStream) throws IOException {
+    dataOutputStream.changeOutputStream(outputStream);
+    // serialize plan type
+    plan.getPlanType().serialize(dataOutputStream);
+    // serialize plan attributes
+    SchemaRegionPlanSerializationResult result =
+        plan.accept(new SchemaRegionPlanSerializeVisitor(), dataOutputStream);
+    if (result.isFailed()) {
+      throw result.getException();
+    }
+  }
+
+  private static class ConfigurableDataOutputStream extends DataOutputStream {
+
+    private ConfigurableDataOutputStream(OutputStream out) {
+      super(out);
+    }
+
+    private void changeOutputStream(OutputStream out) {
+      this.out = out;
+      written = 0;
+    }
+  }
+
+  private static class SchemaRegionPlanSerializationResult {
+
+    private static final SchemaRegionPlanSerializationResult SUCCESS =
+        new SchemaRegionPlanSerializationResult(null);
+
+    private final IOException exception;
+
+    private SchemaRegionPlanSerializationResult(IOException exception) {
+      this.exception = exception;
+    }
+
+    private boolean isFailed() {
+      return exception != null;
+    }
+
+    private IOException getException() {
+      return exception;
+    }
+  }
+
+  private static class SchemaRegionPlanSerializeVisitor
+      extends SchemaRegionPlanVisitor<SchemaRegionPlanSerializationResult, DataOutputStream> {
+    @Override
+    public SchemaRegionPlanSerializationResult visitSchemaRegionPlan(
+        ISchemaRegionPlan plan, DataOutputStream dataOutputStream) {
+      throw new UnsupportedOperationException(
+          String.format("%s plan doesn't support serialization.", plan.getPlanType().name()));
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitActivateTemplateInCluster(
+        IActivateTemplateInClusterPlan activateTemplateInClusterPlan,
+        DataOutputStream dataOutputStream) {
+      try {
+        ReadWriteIOUtils.write(
+            activateTemplateInClusterPlan.getActivatePath().getFullPath(), dataOutputStream);
+        dataOutputStream.writeInt(activateTemplateInClusterPlan.getTemplateSetLevel());
+        dataOutputStream.writeInt(activateTemplateInClusterPlan.getTemplateId());
+        dataOutputStream.writeBoolean(activateTemplateInClusterPlan.isAligned());
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitActivateTemplate(
+        IActivateTemplatePlan activateTemplatePlan, DataOutputStream dataOutputStream) {
+      try {
+        ReadWriteIOUtils.write(
+            activateTemplatePlan.getPrefixPath().getFullPath(), dataOutputStream);
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitAutoCreateDeviceMNode(
+        IAutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan, DataOutputStream dataOutputStream) {
+      try {
+        ReadWriteIOUtils.write(autoCreateDeviceMNodePlan.getPath().getFullPath(), dataOutputStream);
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitChangeAlias(
+        IChangeAliasPlan changeAliasPlan, DataOutputStream dataOutputStream) {
+      try {
+        ReadWriteIOUtils.write(changeAliasPlan.getPath().getFullPath(), dataOutputStream);
+        ReadWriteIOUtils.write(changeAliasPlan.getAlias(), dataOutputStream);
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitChangeTagOffset(
+        IChangeTagOffsetPlan changeTagOffsetPlan, DataOutputStream dataOutputStream) {
+      try {
+        ReadWriteIOUtils.write(changeTagOffsetPlan.getPath().getFullPath(), dataOutputStream);
+        dataOutputStream.writeLong(changeTagOffsetPlan.getOffset());
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitCreateAlignedTimeSeries(
+        ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan,
+        DataOutputStream dataOutputStream) {
+      try {
+        // serialize a int to keep compatible with old version
+        dataOutputStream.writeInt(-1);
+
+        byte[] bytes = createAlignedTimeSeriesPlan.getDevicePath().getFullPath().getBytes();
+        dataOutputStream.writeInt(bytes.length);
+        dataOutputStream.write(bytes);
+
+        List<String> measurements = createAlignedTimeSeriesPlan.getMeasurements();
+        dataOutputStream.writeInt(measurements.size());
+        for (String measurement : measurements) {
+          ReadWriteIOUtils.write(measurement, dataOutputStream);
+        }
+
+        for (TSDataType dataType : createAlignedTimeSeriesPlan.getDataTypes()) {
+          dataOutputStream.writeByte(dataType.ordinal());
+        }
+
+        for (TSEncoding tsEncoding : createAlignedTimeSeriesPlan.getEncodings()) {
+          dataOutputStream.writeByte(tsEncoding.ordinal());
+        }
+
+        for (CompressionType compressionType : createAlignedTimeSeriesPlan.getCompressors()) {
+          dataOutputStream.writeByte(compressionType.ordinal());
+        }
+
+        for (long tagOffset : createAlignedTimeSeriesPlan.getTagOffsets()) {
+          dataOutputStream.writeLong(tagOffset);
+        }
+
+        // alias
+        List<String> aliasList = createAlignedTimeSeriesPlan.getAliasList();
+        if (aliasList == null || aliasList.isEmpty()) {
+          dataOutputStream.writeByte(0);
+        } else {
+          dataOutputStream.writeByte(1);
+          for (String alias : aliasList) {
+            ReadWriteIOUtils.write(alias, dataOutputStream);
+          }
+        }
+
+        // tags
+        List<Map<String, String>> tagsList = createAlignedTimeSeriesPlan.getTagsList();
+        if (tagsList == null || tagsList.isEmpty()) {
+          dataOutputStream.writeByte(0);
+        } else {
+          dataOutputStream.writeByte(1);
+          for (Map<String, String> tags : tagsList) {
+            ReadWriteIOUtils.write(tags, dataOutputStream);
+          }
+        }
+
+        // attributes
+        List<Map<String, String>> attributesList = createAlignedTimeSeriesPlan.getAttributesList();
+        if (attributesList == null || attributesList.isEmpty()) {
+          dataOutputStream.writeByte(0);
+        } else {
+          dataOutputStream.writeByte(1);
+          for (Map<String, String> attributes : attributesList) {
+            ReadWriteIOUtils.write(attributes, dataOutputStream);
+          }
+        }
+
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitCreateTimeSeries(
+        ICreateTimeSeriesPlan createTimeSeriesPlan, DataOutputStream dataOutputStream) {
+      try {
+
+        byte[] bytes = createTimeSeriesPlan.getPath().getFullPath().getBytes();
+        dataOutputStream.writeInt(bytes.length);
+        dataOutputStream.write(bytes);
+        dataOutputStream.writeByte(createTimeSeriesPlan.getDataType().ordinal());
+        dataOutputStream.writeByte(createTimeSeriesPlan.getEncoding().ordinal());
+        dataOutputStream.writeByte(createTimeSeriesPlan.getCompressor().ordinal());
+        dataOutputStream.writeLong(createTimeSeriesPlan.getTagOffset());
+
+        // alias
+        if (createTimeSeriesPlan.getAlias() == null) {
+          dataOutputStream.writeByte(0);
+        } else {
+          dataOutputStream.writeByte(1);
+          ReadWriteIOUtils.write(createTimeSeriesPlan.getAlias(), dataOutputStream);
+        }
+
+        // props
+        Map<String, String> props = createTimeSeriesPlan.getProps();
+        if (props == null || props.isEmpty()) {
+          dataOutputStream.writeByte(0);
+        } else {
+          dataOutputStream.writeByte(1);
+          ReadWriteIOUtils.write(props, dataOutputStream);
+        }
+
+        // tags
+        Map<String, String> tags = createTimeSeriesPlan.getTags();
+        if (tags == null || tags.isEmpty()) {
+          dataOutputStream.writeByte(0);
+        } else {
+          dataOutputStream.writeByte(1);
+          ReadWriteIOUtils.write(tags, dataOutputStream);
+        }
+
+        // attributes
+        Map<String, String> attributes = createTimeSeriesPlan.getAttributes();
+        if (attributes == null || attributes.isEmpty()) {
+          dataOutputStream.writeByte(0);
+        } else {
+          dataOutputStream.writeByte(1);
+          ReadWriteIOUtils.write(attributes, dataOutputStream);
+        }
+
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitDeleteTimeSeries(
+        IDeleteTimeSeriesPlan deleteTimeSeriesPlan, DataOutputStream dataOutputStream) {
+      try {
+        List<PartialPath> deletePathList = deleteTimeSeriesPlan.getDeletePathList();
+        dataOutputStream.writeInt(deletePathList.size());
+        for (PartialPath path : deletePathList) {
+          ReadWriteIOUtils.write(path.getFullPath(), dataOutputStream);
+        }
+
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitPreDeleteTimeSeries(
+        IPreDeleteTimeSeriesPlan preDeleteTimeSeriesPlan, DataOutputStream dataOutputStream) {
+      try {
+        preDeleteTimeSeriesPlan.getPath().serialize(dataOutputStream);
+
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitRollbackPreDeleteTimeSeries(
+        IRollbackPreDeleteTimeSeriesPlan rollbackPreDeleteTimeSeriesPlan,
+        DataOutputStream dataOutputStream) {
+      try {
+        rollbackPreDeleteTimeSeriesPlan.getPath().serialize(dataOutputStream);
+
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitSetTemplate(
+        ISetTemplatePlan setTemplatePlan, DataOutputStream dataOutputStream) {
+      try {
+        ReadWriteIOUtils.write(setTemplatePlan.getTemplateName(), dataOutputStream);
+        ReadWriteIOUtils.write(setTemplatePlan.getPrefixPath(), dataOutputStream);
+
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+
+    @Override
+    public SchemaRegionPlanSerializationResult visitUnsetTemplate(
+        IUnsetTemplatePlan unsetTemplatePlan, DataOutputStream dataOutputStream) {
+      try {
+        ReadWriteIOUtils.write(unsetTemplatePlan.getPrefixPath(), dataOutputStream);
+        ReadWriteIOUtils.write(unsetTemplatePlan.getTemplateName(), dataOutputStream);
+
+        // serialize a long to keep compatible with old version (raft index)
+        dataOutputStream.writeLong(0);
+
+        return SchemaRegionPlanSerializationResult.SUCCESS;
+      } catch (IOException e) {
+        return new SchemaRegionPlanSerializationResult(e);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanTxtSerializer.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanTxtSerializer.java
new file mode 100644
index 0000000000..775e77d241
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SchemaRegionPlanTxtSerializer.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
+
+import org.apache.iotdb.db.metadata.logfile.ISerializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides the ability to serialize the given SchemaRegionPlan into binary data as txt
+ * format, which is used for MLogParser.
+ */
+public class SchemaRegionPlanTxtSerializer implements ISerializer<ISchemaRegionPlan> {
+
+  private static final String FIELD_SEPARATOR = ", ";
+  private static final String LINE_SEPARATOR = System.lineSeparator();
+
+  @Override
+  public void serialize(ISchemaRegionPlan plan, OutputStream outputStream) throws IOException {
+    DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
+    StringBuilder stringBuilder = new StringBuilder();
+    // serialize plan type
+    stringBuilder.append(plan.getPlanType().name()).append(FIELD_SEPARATOR);
+    // serialize plan attributes
+    plan.accept(new SchemaRegionPlanTxtSerializeVisitor(), stringBuilder);
+    stringBuilder.append(LINE_SEPARATOR);
+    dataOutputStream.write(stringBuilder.toString().getBytes());
+  }
+
+  private static class SchemaRegionPlanTxtSerializeVisitor
+      extends SchemaRegionPlanVisitor<Void, StringBuilder> {
+    @Override
+    public Void visitSchemaRegionPlan(ISchemaRegionPlan plan, StringBuilder stringBuilder) {
+      throw new UnsupportedOperationException(
+          String.format("%s plan doesn't support serialization.", plan.getPlanType().name()));
+    }
+
+    @Override
+    public Void visitActivateTemplateInCluster(
+        IActivateTemplateInClusterPlan activateTemplateInClusterPlan, StringBuilder stringBuilder) {
+      stringBuilder
+          .append(activateTemplateInClusterPlan.getActivatePath().getFullPath())
+          .append(FIELD_SEPARATOR)
+          .append(activateTemplateInClusterPlan.getTemplateSetLevel())
+          .append(FIELD_SEPARATOR)
+          .append(activateTemplateInClusterPlan.isAligned());
+      return null;
+    }
+
+    @Override
+    public Void visitActivateTemplate(
+        IActivateTemplatePlan activateTemplatePlan, StringBuilder stringBuilder) {
+      stringBuilder.append(activateTemplatePlan.getPrefixPath().getFullPath());
+      return null;
+    }
+
+    @Override
+    public Void visitAutoCreateDeviceMNode(
+        IAutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan, StringBuilder stringBuilder) {
+      stringBuilder.append(autoCreateDeviceMNodePlan.getPath().getFullPath());
+      return null;
+    }
+
+    @Override
+    public Void visitChangeAlias(IChangeAliasPlan changeAliasPlan, StringBuilder stringBuilder) {
+      stringBuilder
+          .append(changeAliasPlan.getPath().getFullPath())
+          .append(FIELD_SEPARATOR)
+          .append(changeAliasPlan.getAlias());
+      return null;
+    }
+
+    @Override
+    public Void visitChangeTagOffset(
+        IChangeTagOffsetPlan changeTagOffsetPlan, StringBuilder stringBuilder) {
+      stringBuilder
+          .append(changeTagOffsetPlan.getPath().getFullPath())
+          .append(FIELD_SEPARATOR)
+          .append(changeTagOffsetPlan.getOffset());
+      return null;
+    }
+
+    @Override
+    public Void visitCreateAlignedTimeSeries(
+        ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan, StringBuilder stringBuilder) {
+      stringBuilder
+          .append(createAlignedTimeSeriesPlan.getDevicePath().getFullPath())
+          .append(FIELD_SEPARATOR)
+          .append(createAlignedTimeSeriesPlan.getMeasurements())
+          .append(FIELD_SEPARATOR)
+          .append(
+              createAlignedTimeSeriesPlan.getDataTypes().stream()
+                  .map(TSDataType::name)
+                  .collect(Collectors.toList()))
+          .append(FIELD_SEPARATOR)
+          .append(
+              createAlignedTimeSeriesPlan.getEncodings().stream()
+                  .map(TSEncoding::name)
+                  .collect(Collectors.toList()))
+          .append(FIELD_SEPARATOR)
+          .append(
+              createAlignedTimeSeriesPlan.getCompressors().stream()
+                  .map(CompressionType::name)
+                  .collect(Collectors.toList()))
+          .append(FIELD_SEPARATOR);
+
+      if (createAlignedTimeSeriesPlan.getAliasList() != null) {
+        stringBuilder.append(createAlignedTimeSeriesPlan.getAliasList());
+      }
+      stringBuilder.append(FIELD_SEPARATOR);
+
+      if (createAlignedTimeSeriesPlan.getTagsList() != null) {
+        stringBuilder.append(createAlignedTimeSeriesPlan.getTagsList());
+      }
+      stringBuilder.append(FIELD_SEPARATOR);
+
+      if (createAlignedTimeSeriesPlan.getAttributesList() != null) {
+        stringBuilder.append(createAlignedTimeSeriesPlan.getAttributesList());
+      }
+      stringBuilder.append(FIELD_SEPARATOR);
+
+      stringBuilder.append(createAlignedTimeSeriesPlan.getTagOffsets());
+
+      return null;
+    }
+
+    @Override
+    public Void visitCreateTimeSeries(
+        ICreateTimeSeriesPlan createTimeSeriesPlan, StringBuilder stringBuilder) {
+      stringBuilder
+          .append(createTimeSeriesPlan.getPath())
+          .append(FIELD_SEPARATOR)
+          .append(createTimeSeriesPlan.getDataType().name())
+          .append(FIELD_SEPARATOR)
+          .append(createTimeSeriesPlan.getEncoding().name())
+          .append(FIELD_SEPARATOR)
+          .append(createTimeSeriesPlan.getCompressor().name())
+          .append(FIELD_SEPARATOR);
+
+      if (createTimeSeriesPlan.getAlias() != null) {
+        stringBuilder.append(createTimeSeriesPlan.getAlias());
+      }
+      stringBuilder.append(FIELD_SEPARATOR);
+
+      if (createTimeSeriesPlan.getTags() != null) {
+        stringBuilder.append(createTimeSeriesPlan.getTags());
+      }
+      stringBuilder.append(FIELD_SEPARATOR);
+
+      if (createTimeSeriesPlan.getAttributes() != null) {
+        stringBuilder.append(createTimeSeriesPlan.getAttributes());
+      }
+      stringBuilder.append(FIELD_SEPARATOR);
+
+      stringBuilder.append(createTimeSeriesPlan.getTagOffset());
+
+      return null;
+    }
+
+    @Override
+    public Void visitDeleteTimeSeries(
+        IDeleteTimeSeriesPlan deleteTimeSeriesPlan, StringBuilder stringBuilder) {
+      stringBuilder.append(deleteTimeSeriesPlan.getDeletePathList());
+      return null;
+    }
+
+    @Override
+    public Void visitPreDeleteTimeSeries(
+        IPreDeleteTimeSeriesPlan preDeleteTimeSeriesPlan, StringBuilder stringBuilder) {
+      stringBuilder.append(preDeleteTimeSeriesPlan.getPath().getFullPath());
+      return null;
+    }
+
+    @Override
+    public Void visitRollbackPreDeleteTimeSeries(
+        IRollbackPreDeleteTimeSeriesPlan rollbackPreDeleteTimeSeriesPlan,
+        StringBuilder stringBuilder) {
+      stringBuilder.append(rollbackPreDeleteTimeSeriesPlan.getPath().getFullPath());
+      return null;
+    }
+
+    @Override
+    public Void visitSetTemplate(ISetTemplatePlan setTemplatePlan, StringBuilder stringBuilder) {
+      stringBuilder
+          .append(setTemplatePlan.getPrefixPath())
+          .append(FIELD_SEPARATOR)
+          .append(setTemplatePlan.getTemplateName());
+      return null;
+    }
+
+    @Override
+    public Void visitUnsetTemplate(
+        IUnsetTemplatePlan unsetTemplatePlan, StringBuilder stringBuilder) {
+      stringBuilder
+          .append(unsetTemplatePlan.getPrefixPath())
+          .append(FIELD_SEPARATOR)
+          .append(unsetTemplatePlan.getTemplateName());
+      return null;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SetTemplatePlanImpl.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTemplatePlan.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SetTemplatePlanImpl.java
index 5ab66113e8..3b5bf67124 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTemplatePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/SetTemplatePlanImpl.java
@@ -17,31 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.qp.physical.sys;
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.PathUtils;
-import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
+public class SetTemplatePlanImpl implements ISetTemplatePlan {
 
-public class SetTemplatePlan extends PhysicalPlan {
-  String templateName;
-  String prefixPath;
+  private String templateName;
 
-  public SetTemplatePlan() {
-    super(OperatorType.SET_TEMPLATE);
-  }
+  private String prefixPath;
+
+  public SetTemplatePlanImpl() {}
 
-  public SetTemplatePlan(String templateName, String prefixPath) throws IllegalPathException {
-    super(OperatorType.SET_TEMPLATE);
+  public SetTemplatePlanImpl(String templateName, String prefixPath) throws IllegalPathException {
 
     String[] pathNodes = PathUtils.splitPathToDetachedNodes(prefixPath);
     for (String s : pathNodes) {
@@ -56,52 +47,23 @@ public class SetTemplatePlan extends PhysicalPlan {
     this.prefixPath = prefixPath;
   }
 
+  @Override
   public String getTemplateName() {
     return templateName;
   }
 
+  @Override
   public void setTemplateName(String templateName) {
     this.templateName = templateName;
   }
 
+  @Override
   public String getPrefixPath() {
     return prefixPath;
   }
 
+  @Override
   public void setPrefixPath(String prefixPath) {
     this.prefixPath = prefixPath;
   }
-
-  @Override
-  public List<PartialPath> getPaths() {
-    return null;
-  }
-
-  @Override
-  public void serializeImpl(ByteBuffer buffer) {
-    buffer.put((byte) PhysicalPlanType.SET_TEMPLATE.ordinal());
-
-    ReadWriteIOUtils.write(templateName, buffer);
-    ReadWriteIOUtils.write(prefixPath, buffer);
-
-    buffer.putLong(index);
-  }
-
-  @Override
-  public void deserialize(ByteBuffer buffer) {
-    templateName = ReadWriteIOUtils.readString(buffer);
-    prefixPath = ReadWriteIOUtils.readString(buffer);
-
-    this.index = buffer.getLong();
-  }
-
-  @Override
-  public void serialize(DataOutputStream stream) throws IOException {
-    stream.writeByte((byte) PhysicalPlanType.SET_TEMPLATE.ordinal());
-
-    ReadWriteIOUtils.write(templateName, stream);
-    ReadWriteIOUtils.write(prefixPath, stream);
-
-    stream.writeLong(index);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/UnsetTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/UnsetTemplatePlanImpl.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/qp/physical/sys/UnsetTemplatePlan.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/UnsetTemplatePlanImpl.java
index f10560801e..7ea0633094 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/UnsetTemplatePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/UnsetTemplatePlanImpl.java
@@ -18,32 +18,21 @@
  *
  */
 
-package org.apache.iotdb.db.qp.physical.sys;
+package org.apache.iotdb.db.metadata.plan.schemaregion.impl;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.PathUtils;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
+public class UnsetTemplatePlanImpl implements IUnsetTemplatePlan {
 
-public class UnsetTemplatePlan extends PhysicalPlan {
+  private String prefixPath;
+  public String templateName;
 
-  String prefixPath;
-  String templateName;
+  public UnsetTemplatePlanImpl() {}
 
-  public UnsetTemplatePlan() {
-    super(Operator.OperatorType.UNSET_TEMPLATE);
-  }
-
-  public UnsetTemplatePlan(String prefixPath, String templateName) throws IllegalPathException {
-    super(Operator.OperatorType.UNSET_TEMPLATE);
+  public UnsetTemplatePlanImpl(String prefixPath, String templateName) throws IllegalPathException {
 
     String[] pathNodes = PathUtils.splitPathToDetachedNodes(prefixPath);
     for (String s : pathNodes) {
@@ -58,52 +47,23 @@ public class UnsetTemplatePlan extends PhysicalPlan {
     this.templateName = templateName;
   }
 
+  @Override
   public String getPrefixPath() {
     return prefixPath;
   }
 
+  @Override
   public void setPrefixPath(String prefixPath) {
     this.prefixPath = prefixPath;
   }
 
+  @Override
   public String getTemplateName() {
     return templateName;
   }
 
+  @Override
   public void setTemplateName(String templateName) {
     this.templateName = templateName;
   }
-
-  @Override
-  public List<PartialPath> getPaths() {
-    return null;
-  }
-
-  @Override
-  public void serializeImpl(ByteBuffer buffer) {
-    buffer.put((byte) PhysicalPlanType.UNSET_TEMPLATE.ordinal());
-
-    ReadWriteIOUtils.write(prefixPath, buffer);
-    ReadWriteIOUtils.write(templateName, buffer);
-
-    buffer.putLong(index);
-  }
-
-  @Override
-  public void deserialize(ByteBuffer buffer) {
-    prefixPath = ReadWriteIOUtils.readString(buffer);
-    templateName = ReadWriteIOUtils.readString(buffer);
-
-    this.index = buffer.getLong();
-  }
-
-  @Override
-  public void serialize(DataOutputStream stream) throws IOException {
-    stream.writeByte((byte) PhysicalPlanType.UNSET_TEMPLATE.ordinal());
-
-    ReadWriteIOUtils.write(prefixPath, stream);
-    ReadWriteIOUtils.write(templateName, stream);
-
-    stream.writeLong(index);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IActivateTemplateInClusterPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IActivateTemplateInClusterPlan.java
new file mode 100644
index 0000000000..f11a23a4d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IActivateTemplateInClusterPlan.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+import java.util.Arrays;
+
+public interface IActivateTemplateInClusterPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.ACTIVATE_TEMPLATE_IN_CLUSTER;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitActivateTemplateInCluster(this, context);
+  }
+
+  PartialPath getActivatePath();
+
+  void setActivatePath(PartialPath activatePath);
+
+  int getTemplateSetLevel();
+
+  void setTemplateSetLevel(int templateId);
+
+  int getTemplateId();
+
+  void setTemplateId(int templateSetLevel);
+
+  boolean isAligned();
+
+  void setAligned(boolean aligned);
+
+  default PartialPath getPathSetTemplate() {
+    return new PartialPath(Arrays.copyOf(getActivatePath().getNodes(), getTemplateSetLevel() + 1));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IActivateTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IActivateTemplatePlan.java
new file mode 100644
index 0000000000..e44ce02c6b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IActivateTemplatePlan.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface IActivateTemplatePlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.ACTIVATE_TEMPLATE;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitActivateTemplate(this, context);
+  }
+
+  PartialPath getPrefixPath();
+
+  void setPrefixPath(PartialPath prefixPath);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IAutoCreateDeviceMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IAutoCreateDeviceMNodePlan.java
new file mode 100644
index 0000000000..ae12e13bd1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IAutoCreateDeviceMNodePlan.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface IAutoCreateDeviceMNodePlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.AUTO_CREATE_DEVICE_MNODE;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitAutoCreateDeviceMNode(this, context);
+  }
+
+  PartialPath getPath();
+
+  void setPath(PartialPath path);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IChangeAliasPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IChangeAliasPlan.java
new file mode 100644
index 0000000000..961c1e9fe4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IChangeAliasPlan.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface IChangeAliasPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.CHANGE_ALIAS;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitChangeAlias(this, context);
+  }
+
+  PartialPath getPath();
+
+  void setPath(PartialPath path);
+
+  String getAlias();
+
+  void setAlias(String alias);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IChangeTagOffsetPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IChangeTagOffsetPlan.java
new file mode 100644
index 0000000000..9677bc3944
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IChangeTagOffsetPlan.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface IChangeTagOffsetPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.CHANGE_TAG_OFFSET;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitChangeTagOffset(this, context);
+  }
+
+  PartialPath getPath();
+
+  void setPath(PartialPath path);
+
+  long getOffset();
+
+  void setOffset(long offset);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ICreateAlignedTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ICreateAlignedTimeSeriesPlan.java
new file mode 100644
index 0000000000..c4a1f92ba0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ICreateAlignedTimeSeriesPlan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ICreateAlignedTimeSeriesPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.CREATE_ALIGNED_TIMESERIES;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitCreateAlignedTimeSeries(this, context);
+  }
+
+  PartialPath getDevicePath();
+
+  void setDevicePath(PartialPath devicePath);
+
+  List<String> getMeasurements();
+
+  void setMeasurements(List<String> measurements);
+
+  List<TSDataType> getDataTypes();
+
+  void setDataTypes(List<TSDataType> dataTypes);
+
+  List<TSEncoding> getEncodings();
+
+  void setEncodings(List<TSEncoding> encodings);
+
+  List<CompressionType> getCompressors();
+
+  void setCompressors(List<CompressionType> compressors);
+
+  List<String> getAliasList();
+
+  void setAliasList(List<String> aliasList);
+
+  List<Map<String, String>> getTagsList();
+
+  void setTagsList(List<Map<String, String>> tagsList);
+
+  List<Map<String, String>> getAttributesList();
+
+  void setAttributesList(List<Map<String, String>> attributesList);
+
+  List<Long> getTagOffsets();
+
+  void setTagOffsets(List<Long> tagOffsets);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ICreateTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ICreateTimeSeriesPlan.java
new file mode 100644
index 0000000000..9424dfb8db
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ICreateTimeSeriesPlan.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.Map;
+
+public interface ICreateTimeSeriesPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.CREATE_TIMESERIES;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitCreateTimeSeries(this, context);
+  }
+
+  PartialPath getPath();
+
+  void setPath(PartialPath path);
+
+  TSDataType getDataType();
+
+  void setDataType(TSDataType dataType);
+
+  CompressionType getCompressor();
+
+  void setCompressor(CompressionType compressor);
+
+  TSEncoding getEncoding();
+
+  void setEncoding(TSEncoding encoding);
+
+  Map<String, String> getAttributes();
+
+  void setAttributes(Map<String, String> attributes);
+
+  String getAlias();
+
+  void setAlias(String alias);
+
+  Map<String, String> getTags();
+
+  void setTags(Map<String, String> tags);
+
+  Map<String, String> getProps();
+
+  void setProps(Map<String, String> props);
+
+  long getTagOffset();
+
+  void setTagOffset(long tagOffset);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IDeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IDeleteTimeSeriesPlan.java
new file mode 100644
index 0000000000..1a5d5c704b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IDeleteTimeSeriesPlan.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+import java.util.List;
+
+public interface IDeleteTimeSeriesPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.DELETE_TIMESERIES;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeleteTimeSeries(this, context);
+  }
+
+  List<PartialPath> getDeletePathList();
+
+  void setDeletePathList(List<PartialPath> deletePathList);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IPreDeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IPreDeleteTimeSeriesPlan.java
new file mode 100644
index 0000000000..16c0043f60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IPreDeleteTimeSeriesPlan.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface IPreDeleteTimeSeriesPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.PRE_DELETE_TIMESERIES_IN_CLUSTER;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPreDeleteTimeSeries(this, context);
+  }
+
+  PartialPath getPath();
+
+  void setPath(PartialPath path);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IRollbackPreDeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IRollbackPreDeleteTimeSeriesPlan.java
new file mode 100644
index 0000000000..c2e4bbed6a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IRollbackPreDeleteTimeSeriesPlan.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface IRollbackPreDeleteTimeSeriesPlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.ROLLBACK_PRE_DELETE_TIMESERIES;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitRollbackPreDeleteTimeSeries(this, context);
+  }
+
+  PartialPath getPath();
+
+  void setPath(PartialPath path);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ISetTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ISetTemplatePlan.java
new file mode 100644
index 0000000000..ef0412d7ac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ISetTemplatePlan.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface ISetTemplatePlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.SET_TEMPLATE;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSetTemplate(this, context);
+  }
+
+  String getTemplateName();
+
+  void setTemplateName(String templateName);
+
+  String getPrefixPath();
+
+  void setPrefixPath(String prefixPath);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IUnsetTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IUnsetTemplatePlan.java
new file mode 100644
index 0000000000..d76e98eb83
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/IUnsetTemplatePlan.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan.schemaregion.write;
+
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+
+public interface IUnsetTemplatePlan extends ISchemaRegionPlan {
+
+  @Override
+  default SchemaRegionPlanType getPlanType() {
+    return SchemaRegionPlanType.UNSET_TEMPLATE;
+  }
+
+  @Override
+  default <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) {
+    return visitor.visitUnsetTemplate(this, context);
+  }
+
+  String getPrefixPath();
+
+  void setPrefixPath(String prefixPath);
+
+  String getTemplateName();
+
+  void setTemplateName(String templateName);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 77d1b5e81f..eccdde71de 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
@@ -106,9 +106,9 @@ public interface ISchemaRegion {
   // endregion
 
   // region Interfaces for Timeseries operation
-  void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException;
+  void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException;
 
-  void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException;
+  void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException;
 
   Map<Integer, MetadataException> checkMeasurementExistence(
       PartialPath devicePath, List<String> measurementList, List<String> aliasList);
@@ -154,7 +154,7 @@ public interface ISchemaRegion {
 
   // region Interfaces for auto create device
   // auto create a deviceMNode, currently only used for schema sync operation
-  void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException;
+  void autoCreateDeviceMNode(IAutoCreateDeviceMNodePlan plan) throws MetadataException;
   // endregion
 
   // region Interfaces for metadata info Query
@@ -420,13 +420,13 @@ public interface ISchemaRegion {
   boolean isTemplateAppendable(Template template, List<String> measurements)
       throws MetadataException;
 
-  void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException;
+  void setSchemaTemplate(ISetTemplatePlan plan) throws MetadataException;
 
-  void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException;
+  void unsetSchemaTemplate(IUnsetTemplatePlan plan) throws MetadataException;
 
-  void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException;
+  void setUsingSchemaTemplate(IActivateTemplatePlan plan) throws MetadataException;
 
-  void activateSchemaTemplate(ActivateTemplateInClusterPlan plan, Template template)
+  void activateSchemaTemplate(IActivateTemplateInClusterPlan plan, Template template)
       throws MetadataException;
 
   List<String> getPathsUsingTemplate(int templateId) throws MetadataException;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index ed35f0e642..b9f11f3271 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -47,13 +47,32 @@ import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
-import org.apache.iotdb.db.metadata.logfile.MLogReader;
-import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.logfile.FakeCRC32Deserializer;
+import org.apache.iotdb.db.metadata.logfile.FakeCRC32Serializer;
+import org.apache.iotdb.db.metadata.logfile.SchemaLogReader;
+import org.apache.iotdb.db.metadata.logfile.SchemaLogWriter;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanFactory;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanSerializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
 import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
@@ -61,25 +80,11 @@ import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.MeasurementSchemaInfo;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
-import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.PreDeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.RollbackPreDeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
@@ -171,7 +176,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
   // the log file writer
   private boolean usingMLog = true;
-  private MLogWriter logWriter;
+  private SchemaLogWriter<ISchemaRegionPlan> logWriter;
 
   private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
@@ -297,13 +302,17 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private void initMLog() throws IOException {
     int lineNumber = initFromLog();
 
-    logWriter = new MLogWriter(schemaRegionDirPath, MetadataConstant.METADATA_LOG);
-    logWriter.setLogNum(lineNumber);
+    logWriter =
+        new SchemaLogWriter<>(
+            schemaRegionDirPath,
+            MetadataConstant.METADATA_LOG,
+            new FakeCRC32Serializer<>(new SchemaRegionPlanSerializer()),
+            config.getSyncMlogPeriodInMs() == 0);
   }
 
-  public void writeToMLog(PhysicalPlan plan) throws IOException {
+  public void writeToMLog(ISchemaRegionPlan schemaRegionPlan) throws IOException {
     if (usingMLog && !isRecovering) {
-      logWriter.putLog(plan);
+      logWriter.write(schemaRegionPlan);
     }
   }
 
@@ -336,8 +345,11 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // init the metadata from the operation log
     if (logFile.exists()) {
       int idx = 0;
-      try (MLogReader mLogReader =
-          new MLogReader(schemaRegionDirPath, MetadataConstant.METADATA_LOG); ) {
+      try (SchemaLogReader<ISchemaRegionPlan> mLogReader =
+          new SchemaLogReader<>(
+              schemaRegionDirPath,
+              MetadataConstant.METADATA_LOG,
+              new FakeCRC32Deserializer<>(new SchemaRegionPlanDeserializer()))) {
         idx = applyMLog(mLogReader);
         logger.debug(
             "spend {} ms to deserialize {} mtree from mlog.bin",
@@ -353,9 +365,11 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
-  private int applyMLog(MLogReader mLogReader) {
+  private int applyMLog(SchemaLogReader<ISchemaRegionPlan> mLogReader) {
     int idx = 0;
-    PhysicalPlan plan;
+    ISchemaRegionPlan plan;
+    RecoverPlanOperator recoverPlanOperator = new RecoverPlanOperator();
+    RecoverOperationResult operationResult;
     while (mLogReader.hasNext()) {
       try {
         plan = mLogReader.next();
@@ -367,13 +381,20 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       if (plan == null) {
         continue;
       }
-      try {
-        operation(plan);
-      } catch (MetadataException | IOException e) {
-        logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
+      operationResult = plan.accept(recoverPlanOperator, this);
+      if (operationResult.isFailed()) {
+        logger.error(
+            "Can not operate cmd {} for err:",
+            plan.getPlanType().name(),
+            operationResult.getException());
       }
     }
 
+    if (mLogReader.isFileCorrupted()) {
+      throw new IllegalStateException(
+          "The mlog.bin has been corrupted. Please remove it or fix it, and then restart IoTDB");
+    }
+
     return idx;
   }
 
@@ -403,65 +424,6 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
-  // this method is mainly used for recover
-  private void operation(PhysicalPlan plan) throws IOException, MetadataException {
-    switch (plan.getOperatorType()) {
-      case CREATE_TIMESERIES:
-        CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
-        createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
-        break;
-      case CREATE_ALIGNED_TIMESERIES:
-        CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
-            (CreateAlignedTimeSeriesPlan) plan;
-        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
-        break;
-      case DELETE_TIMESERIES:
-        DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
-        // cause we only has one path for one DeleteTimeSeriesPlan
-        deleteOneTimeseriesUpdateStatisticsAndDropTrigger(deleteTimeSeriesPlan.getPaths().get(0));
-        break;
-      case CHANGE_ALIAS:
-        ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
-        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
-        break;
-      case CHANGE_TAG_OFFSET:
-        ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
-        changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
-        break;
-      case SET_TEMPLATE:
-        SetTemplatePlan setTemplatePlan = (SetTemplatePlan) plan;
-        setSchemaTemplate(setTemplatePlan);
-        break;
-      case ACTIVATE_TEMPLATE:
-        ActivateTemplatePlan activateTemplatePlan = (ActivateTemplatePlan) plan;
-        setUsingSchemaTemplate(activateTemplatePlan);
-        break;
-      case AUTO_CREATE_DEVICE_MNODE:
-        AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
-        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
-        break;
-      case UNSET_TEMPLATE:
-        UnsetTemplatePlan unsetTemplatePlan = (UnsetTemplatePlan) plan;
-        unsetSchemaTemplate(unsetTemplatePlan);
-        break;
-      case ACTIVATE_TEMPLATE_IN_CLUSTER:
-        ActivateTemplateInClusterPlan activateTemplateInClusterPlan =
-            (ActivateTemplateInClusterPlan) plan;
-        recoverActivatingSchemaTemplate(activateTemplateInClusterPlan);
-        break;
-      case PRE_DELETE_TIMESERIES_IN_CLUSTER:
-        PreDeleteTimeSeriesPlan preDeleteTimeSeriesPlan = (PreDeleteTimeSeriesPlan) plan;
-        recoverPreDeleteTimeseries(preDeleteTimeSeriesPlan.getPath());
-        break;
-      case ROLLBACK_PRE_DELETE_TIMESERIES:
-        RollbackPreDeleteTimeSeriesPlan rollbackPreDeleteTimeSeriesPlan =
-            (RollbackPreDeleteTimeSeriesPlan) plan;
-        recoverRollbackPreDeleteTimeseries(rollbackPreDeleteTimeSeriesPlan.getPath());
-        break;
-      default:
-        logger.error("Unrecognizable command {}", plan.getOperatorType());
-    }
-  }
   // endregion
 
   // region Interfaces for schema region Info query and operation
@@ -602,13 +564,13 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   // region Interfaces and Implementation for Timeseries operation
   // including create and delete
 
-  public void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
+  public void createTimeseries(ICreateTimeSeriesPlan plan) throws MetadataException {
     createTimeseries(plan, -1);
   }
 
   @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+  public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
       throw new SeriesOverflowException();
     }
@@ -688,7 +650,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param plan CreateAlignedTimeSeriesPlan
    */
   @Override
-  public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+  public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
     int seriesCount = plan.getMeasurements().size();
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
       throw new SeriesOverflowException();
@@ -699,7 +661,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
 
     try {
-      PartialPath prefixPath = plan.getPrefixPath();
+      PartialPath prefixPath = plan.getDevicePath();
       List<String> measurements = plan.getMeasurements();
       List<TSDataType> dataTypes = plan.getDataTypes();
       List<TSEncoding> encodings = plan.getEncodings();
@@ -783,7 +745,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
     // update id table if not in recovering or disable id table log file
     if (config.isEnableIDTable() && (!isRecovering || !config.isEnableIDTableLogFile())) {
-      IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
+      IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getDevicePath());
       idTable.createAlignedTimeseries(plan);
     }
   }
@@ -832,7 +794,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
         preDeletedNum++;
         measurementMNode.setPreDeleted(true);
         try {
-          writeToMLog(new PreDeleteTimeSeriesPlan(measurementMNode.getPartialPath()));
+          writeToMLog(
+              SchemaRegionPlanFactory.getPreDeleteTimeSeriesPlan(
+                  measurementMNode.getPartialPath()));
         } catch (IOException e) {
           throw new MetadataException(e);
         }
@@ -852,7 +816,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       for (IMeasurementMNode measurementMNode : mtree.getMatchedMeasurementMNode(pathPattern)) {
         measurementMNode.setPreDeleted(false);
         try {
-          writeToMLog(new RollbackPreDeleteTimeSeriesPlan(measurementMNode.getPartialPath()));
+          writeToMLog(
+              SchemaRegionPlanFactory.getRollbackPreDeleteTimeSeriesPlan(
+                  measurementMNode.getPartialPath()));
         } catch (IOException e) {
           throw new MetadataException(e);
         }
@@ -878,7 +844,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       for (PartialPath path : mtree.getPreDeletedTimeseries(pathPattern)) {
         try {
           deleteSingleTimeseriesInBlackList(path);
-          writeToMLog(new DeleteTimeSeriesPlan(Collections.singletonList(path)));
+          writeToMLog(
+              SchemaRegionPlanFactory.getDeleteTimeSeriesPlan(Collections.singletonList(path)));
         } catch (IOException e) {
           throw new MetadataException(e);
         }
@@ -925,15 +892,13 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
   private void deleteSingleTimeseriesInternal(PartialPath p, Set<String> failedNames)
       throws MetadataException, IOException {
-    DeleteTimeSeriesPlan deleteTimeSeriesPlan = new DeleteTimeSeriesPlan();
     try {
       PartialPath emptyStorageGroup = deleteOneTimeseriesUpdateStatisticsAndDropTrigger(p);
       if (!isRecovering) {
         if (emptyStorageGroup != null) {
           StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(emptyStorageGroup);
         }
-        deleteTimeSeriesPlan.setDeletePathList(Collections.singletonList(p));
-        writeToMLog(deleteTimeSeriesPlan);
+        writeToMLog(SchemaRegionPlanFactory.getDeleteTimeSeriesPlan(Collections.singletonList(p)));
       }
     } catch (DeleteFailedException e) {
       failedNames.add(e.getName());
@@ -991,12 +956,12 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
 
     node = mtree.getDeviceNodeWithAutoCreating(path);
-    writeToMLog(new AutoCreateDeviceMNodePlan(node.getPartialPath()));
+    writeToMLog(SchemaRegionPlanFactory.getAutoCreateDeviceMNodePlan(node.getPartialPath()));
     return node;
   }
 
   @Override
-  public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
+  public void autoCreateDeviceMNode(IAutoCreateDeviceMNodePlan plan) throws MetadataException {
     mtree.getDeviceNodeWithAutoCreating(plan.getPath());
     try {
       writeToMLog(plan);
@@ -1366,28 +1331,6 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     return new Pair<>(res, ans.right);
   }
 
-  /**
-   * Get series type for given seriesPath.
-   *
-   * @param fullPath full path
-   */
-  public TSDataType getSeriesType(PartialPath fullPath) throws MetadataException {
-    if (fullPath.equals(SQLConstant.TIME_PATH)) {
-      return TSDataType.INT64;
-    }
-    return getSeriesSchema(fullPath).getType();
-  }
-
-  /**
-   * Get schema of paritialPath
-   *
-   * @param fullPath (may be ParitialPath or AlignedPath)
-   * @return MeasurementSchema
-   */
-  public IMeasurementSchema getSeriesSchema(PartialPath fullPath) throws MetadataException {
-    return getMeasurementMNode(fullPath).getSchema();
-  }
-
   // attention: this path must be a device node
   @Override
   public List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
@@ -1498,7 +1441,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     mtree.setAlias(leafMNode, alias);
 
     try {
-      writeToMLog(new ChangeAliasPlan(path, alias));
+      writeToMLog(SchemaRegionPlanFactory.getChangeAliasPlan(path, alias));
     } catch (IOException e) {
       throw new MetadataException(e);
     }
@@ -1533,7 +1476,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // no tag or attribute, we need to add a new record in log
     if (leafMNode.getOffset() < 0) {
       long offset = tagManager.writeTagFile(tagsMap, attributesMap);
-      writeToMLog(new ChangeTagOffsetPlan(fullPath, offset));
+      writeToMLog(SchemaRegionPlanFactory.getChangeTagOffsetPlan(fullPath, offset));
       leafMNode.setOffset(offset);
       // update inverted Index map
       if (tagsMap != null && !tagsMap.isEmpty()) {
@@ -1559,7 +1502,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
       mtree.setAlias(leafMNode, alias);
       // persist to WAL
-      writeToMLog(new ChangeAliasPlan(fullPath, alias));
+      writeToMLog(SchemaRegionPlanFactory.getChangeAliasPlan(fullPath, alias));
     }
   }
 
@@ -1577,7 +1520,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // no tag or attribute, we need to add a new record in log
     if (leafMNode.getOffset() < 0) {
       long offset = tagManager.writeTagFile(Collections.emptyMap(), attributesMap);
-      writeToMLog(new ChangeTagOffsetPlan(fullPath, offset));
+      writeToMLog(SchemaRegionPlanFactory.getChangeTagOffsetPlan(fullPath, offset));
       leafMNode.setOffset(offset);
       return;
     }
@@ -1598,7 +1541,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // no tag or attribute, we need to add a new record in log
     if (leafMNode.getOffset() < 0) {
       long offset = tagManager.writeTagFile(tagsMap, Collections.emptyMap());
-      writeToMLog(new ChangeTagOffsetPlan(fullPath, offset));
+      writeToMLog(SchemaRegionPlanFactory.getChangeTagOffsetPlan(fullPath, offset));
       leafMNode.setOffset(offset);
       // update inverted Index map
       tagManager.addIndex(tagsMap, leafMNode);
@@ -1874,7 +1817,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       throws MetadataException {
     try {
       createTimeseries(
-          new CreateTimeSeriesPlan(
+          SchemaRegionPlanFactory.getCreateTimeSeriesPlan(
               path, dataType, encoding, compressor, Collections.emptyMap(), null, null, null));
     } catch (MeasurementAlreadyExistException e) {
       if (logger.isDebugEnabled()) {
@@ -1896,7 +1839,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       throws MetadataException {
     try {
       createAlignedTimeSeries(
-          new CreateAlignedTimeSeriesPlan(
+          SchemaRegionPlanFactory.getCreateAlignedTimeSeriesPlan(
               devicePath,
               Collections.singletonList(measurement),
               Collections.singletonList(dataType),
@@ -2004,7 +1947,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   }
 
   @Override
-  public synchronized void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
+  public synchronized void setSchemaTemplate(ISetTemplatePlan plan) throws MetadataException {
     // get mnode and update template should be atomic
     Template template = TemplateManager.getInstance().getTemplate(plan.getTemplateName());
 
@@ -2030,7 +1973,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   }
 
   @Override
-  public synchronized void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
+  public synchronized void unsetSchemaTemplate(IUnsetTemplatePlan plan) throws MetadataException {
     // get mnode should be atomic
     try {
       PartialPath path = new PartialPath(plan.getPrefixPath());
@@ -2055,7 +1998,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   }
 
   @Override
-  public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
+  public void setUsingSchemaTemplate(IActivateTemplatePlan plan) throws MetadataException {
     // check whether any template has been set on designated path
     if (mtree.getTemplateOnPath(plan.getPrefixPath()) == null) {
       throw new MetadataException(
@@ -2105,7 +2048,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       mNodeCache.invalidate(mountedMNode.getPartialPath());
     }
     try {
-      writeToMLog(new ActivateTemplatePlan(node.getPartialPath()));
+      writeToMLog(SchemaRegionPlanFactory.getActivateTemplatePlan(node.getPartialPath()));
     } catch (IOException e) {
       throw new MetadataException(e);
     }
@@ -2113,7 +2056,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   }
 
   @Override
-  public void activateSchemaTemplate(ActivateTemplateInClusterPlan plan, Template template)
+  public void activateSchemaTemplate(IActivateTemplateInClusterPlan plan, Template template)
       throws MetadataException {
     try {
       if (plan.getPathSetTemplate().getFullPath().length() <= storageGroupFullPath.length()) {
@@ -2130,7 +2073,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
-  private void recoverActivatingSchemaTemplate(ActivateTemplateInClusterPlan plan) {
+  private void recoverActivatingSchemaTemplate(IActivateTemplateInClusterPlan plan) {
     if (plan.getPathSetTemplate().getFullPath().length() <= storageGroupFullPath.length()) {
       templateId = plan.getTemplateId();
     }
@@ -2163,4 +2106,168 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   }
 
   // endregion
+
+  private static class RecoverOperationResult {
+
+    private static final RecoverOperationResult SUCCESS = new RecoverOperationResult(null);
+
+    private final Exception e;
+
+    private RecoverOperationResult(Exception e) {
+      this.e = e;
+    }
+
+    private boolean isFailed() {
+      return e == null;
+    }
+
+    private Exception getException() {
+      return e;
+    }
+  }
+
+  private class RecoverPlanOperator
+      extends SchemaRegionPlanVisitor<RecoverOperationResult, SchemaRegionMemoryImpl> {
+
+    @Override
+    public RecoverOperationResult visitSchemaRegionPlan(
+        ISchemaRegionPlan plan, SchemaRegionMemoryImpl context) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "SchemaRegionPlan of type %s doesn't support recover operation in SchemaRegionMemoryImpl.",
+              plan.getPlanType().name()));
+    }
+
+    @Override
+    public RecoverOperationResult visitCreateTimeSeries(
+        ICreateTimeSeriesPlan createTimeSeriesPlan, SchemaRegionMemoryImpl context) {
+      try {
+        createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitCreateAlignedTimeSeries(
+        ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan, SchemaRegionMemoryImpl context) {
+      try {
+        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitDeleteTimeSeries(
+        IDeleteTimeSeriesPlan deleteTimeSeriesPlan, SchemaRegionMemoryImpl context) {
+      try {
+        // since we only has one path for one DeleteTimeSeriesPlan
+        deleteOneTimeseriesUpdateStatisticsAndDropTrigger(
+            deleteTimeSeriesPlan.getDeletePathList().get(0));
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException | IOException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitChangeAlias(
+        IChangeAliasPlan changeAliasPlan, SchemaRegionMemoryImpl context) {
+      try {
+        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitChangeTagOffset(
+        IChangeTagOffsetPlan changeTagOffsetPlan, SchemaRegionMemoryImpl context) {
+      try {
+        changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitAutoCreateDeviceMNode(
+        IAutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan, SchemaRegionMemoryImpl context) {
+      try {
+        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitActivateTemplateInCluster(
+        IActivateTemplateInClusterPlan activateTemplateInClusterPlan,
+        SchemaRegionMemoryImpl context) {
+      recoverActivatingSchemaTemplate(activateTemplateInClusterPlan);
+      return RecoverOperationResult.SUCCESS;
+    }
+
+    @Override
+    public RecoverOperationResult visitPreDeleteTimeSeries(
+        IPreDeleteTimeSeriesPlan preDeleteTimeSeriesPlan, SchemaRegionMemoryImpl context) {
+      try {
+        recoverPreDeleteTimeseries(preDeleteTimeSeriesPlan.getPath());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitRollbackPreDeleteTimeSeries(
+        IRollbackPreDeleteTimeSeriesPlan rollbackPreDeleteTimeSeriesPlan,
+        SchemaRegionMemoryImpl context) {
+      try {
+        recoverRollbackPreDeleteTimeseries(rollbackPreDeleteTimeSeriesPlan.getPath());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitSetTemplate(
+        ISetTemplatePlan setTemplatePlan, SchemaRegionMemoryImpl context) {
+      try {
+        setSchemaTemplate(setTemplatePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitUnsetTemplate(
+        IUnsetTemplatePlan unsetTemplatePlan, SchemaRegionMemoryImpl context) {
+      try {
+        unsetSchemaTemplate(unsetTemplatePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitActivateTemplate(
+        IActivateTemplatePlan activateTemplatePlan, SchemaRegionMemoryImpl context) {
+      try {
+        setUsingSchemaTemplate(activateTemplatePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index fbe6523ac3..d8404e50a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -45,36 +45,41 @@ import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
-import org.apache.iotdb.db.metadata.logfile.MLogReader;
-import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.logfile.FakeCRC32Deserializer;
+import org.apache.iotdb.db.metadata.logfile.FakeCRC32Serializer;
+import org.apache.iotdb.db.metadata.logfile.SchemaLogReader;
+import org.apache.iotdb.db.metadata.logfile.SchemaLogWriter;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanFactory;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanSerializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
 import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
-import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
@@ -163,7 +168,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   // the log file seriesPath
   private String logFilePath;
   private File logFile;
-  private MLogWriter logWriter;
+  private SchemaLogWriter<ISchemaRegionPlan> logWriter;
 
   private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
@@ -263,8 +268,12 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
       int lineNumber = initFromLog(logFile);
 
-      logWriter = new MLogWriter(schemaRegionDirPath, MetadataConstant.METADATA_LOG);
-      logWriter.setLogNum(lineNumber);
+      logWriter =
+          new SchemaLogWriter<>(
+              schemaRegionDirPath,
+              MetadataConstant.METADATA_LOG,
+              new FakeCRC32Serializer<>(new SchemaRegionPlanSerializer()),
+              config.getSyncMlogPeriodInMs() == 0);
       isRecovering = false;
     } catch (IOException e) {
       logger.error(
@@ -298,8 +307,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     // init the metadata from the operation log
     if (logFile.exists()) {
       int idx = 0;
-      try (MLogReader mLogReader =
-          new MLogReader(schemaRegionDirPath, MetadataConstant.METADATA_LOG); ) {
+      try (SchemaLogReader<ISchemaRegionPlan> mLogReader =
+          new SchemaLogReader<>(
+              schemaRegionDirPath,
+              MetadataConstant.METADATA_LOG,
+              new FakeCRC32Deserializer<>(new SchemaRegionPlanDeserializer()))) {
         idx = applyMLog(mLogReader);
         logger.debug(
             "spend {} ms to deserialize {} mtree from mlog.bin",
@@ -315,9 +327,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
   }
 
-  private int applyMLog(MLogReader mLogReader) {
+  private int applyMLog(SchemaLogReader<ISchemaRegionPlan> mLogReader) {
     int idx = 0;
-    PhysicalPlan plan;
+    ISchemaRegionPlan plan;
+    RecoverPlanOperator recoverPlanOperator = new RecoverPlanOperator();
+    RecoverOperationResult operationResult;
     while (mLogReader.hasNext()) {
       try {
         plan = mLogReader.next();
@@ -329,13 +343,20 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       if (plan == null) {
         continue;
       }
-      try {
-        operation(plan);
-      } catch (MetadataException | IOException e) {
-        logger.error("Can not operate cmd {} for err:", plan.getOperatorType(), e);
+      operationResult = plan.accept(recoverPlanOperator, this);
+      if (operationResult.isFailed()) {
+        logger.error(
+            "Can not operate cmd {} for err:",
+            plan.getPlanType().name(),
+            operationResult.getException());
       }
     }
 
+    if (mLogReader.isFileCorrupted()) {
+      throw new IllegalStateException(
+          "The mlog.bin has been corrupted. Please remove it or fix it, and then restart IoTDB");
+    }
+
     return idx;
   }
 
@@ -364,51 +385,6 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     isClearing = false;
   }
 
-  // this method is mainly used for recover
-  private void operation(PhysicalPlan plan) throws IOException, MetadataException {
-    switch (plan.getOperatorType()) {
-      case CREATE_TIMESERIES:
-        CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
-        recoverTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
-        break;
-      case CREATE_ALIGNED_TIMESERIES:
-        CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
-            (CreateAlignedTimeSeriesPlan) plan;
-        recoverAlignedTimeSeries(createAlignedTimeSeriesPlan);
-        break;
-      case DELETE_TIMESERIES:
-        DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
-        // cause we only has one path for one DeleteTimeSeriesPlan
-        deleteOneTimeseriesUpdateStatisticsAndDropTrigger(deleteTimeSeriesPlan.getPaths().get(0));
-        break;
-      case CHANGE_ALIAS:
-        ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
-        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
-        break;
-      case CHANGE_TAG_OFFSET:
-        ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
-        changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
-        break;
-      case SET_TEMPLATE:
-        SetTemplatePlan setTemplatePlan = (SetTemplatePlan) plan;
-        setSchemaTemplate(setTemplatePlan);
-        break;
-      case ACTIVATE_TEMPLATE:
-        ActivateTemplatePlan activateTemplatePlan = (ActivateTemplatePlan) plan;
-        setUsingSchemaTemplate(activateTemplatePlan);
-        break;
-      case AUTO_CREATE_DEVICE_MNODE:
-        AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
-        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
-        break;
-      case UNSET_TEMPLATE:
-        UnsetTemplatePlan unsetTemplatePlan = (UnsetTemplatePlan) plan;
-        unsetSchemaTemplate(unsetTemplatePlan);
-        break;
-      default:
-        logger.error("Unrecognizable command {}", plan.getOperatorType());
-    }
-  }
   // endregion
 
   // region Interfaces for schema region Info query and operation
@@ -461,11 +437,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   // region Interfaces and Implementation for Timeseries operation
   // including create and delete
 
-  public void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
+  public void createTimeseries(ICreateTimeSeriesPlan plan) throws MetadataException {
     createTimeseries(plan, -1);
   }
 
-  public void recoverTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+  public void recoverTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
     boolean done = false;
     while (!done) {
       try {
@@ -486,7 +462,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
   @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+  public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
       logger.error(
           String.format("Series overflow when creating: [%s]", plan.getPath().getFullPath()));
@@ -545,7 +521,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
             offset = tagManager.writeTagFile(plan.getTags(), plan.getAttributes());
           }
           plan.setTagOffset(offset);
-          logWriter.createTimeseries(plan);
+          logWriter.write(plan);
         }
         if (offset != -1) {
           leafMNode.setOffset(offset);
@@ -584,7 +560,8 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       throws MetadataException {
     try {
       createTimeseries(
-          new CreateTimeSeriesPlan(path, dataType, encoding, compressor, props, null, null, null));
+          SchemaRegionPlanFactory.getCreateTimeSeriesPlan(
+              path, dataType, encoding, compressor, props, null, null, null));
     } catch (PathAlreadyExistException | AliasAlreadyExistException e) {
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -603,11 +580,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       List<CompressionType> compressors)
       throws MetadataException {
     createAlignedTimeSeries(
-        new CreateAlignedTimeSeriesPlan(
+        SchemaRegionPlanFactory.getCreateAlignedTimeSeriesPlan(
             prefixPath, measurements, dataTypes, encodings, compressors, null, null, null));
   }
 
-  public void recoverAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+  public void recoverAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
     boolean done = false;
     while (!done) {
       try {
@@ -632,7 +609,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param plan CreateAlignedTimeSeriesPlan
    */
   @Override
-  public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+  public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
     int seriesCount = plan.getMeasurements().size();
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
       throw new SeriesOverflowException();
@@ -643,7 +620,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
 
     try {
-      PartialPath prefixPath = plan.getPrefixPath();
+      PartialPath prefixPath = plan.getDevicePath();
       List<String> measurements = plan.getMeasurements();
       List<TSDataType> dataTypes = plan.getDataTypes();
       List<TSEncoding> encodings = plan.getEncodings();
@@ -713,7 +690,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
             }
           }
           plan.setTagOffsets(tagOffsets);
-          logWriter.createAlignedTimeseries(plan);
+          logWriter.write(plan);
         }
         tagOffsets = plan.getTagOffsets();
         for (int i = 0; i < measurements.size(); i++) {
@@ -733,7 +710,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
     // update id table if not in recovering or disable id table log file
     if (config.isEnableIDTable() && (!isRecovering || !config.isEnableIDTableLogFile())) {
-      IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
+      IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getDevicePath());
       idTable.createAlignedTimeseries(plan);
     }
   }
@@ -805,15 +782,14 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
   private void deleteSingleTimeseriesInternal(PartialPath p, Set<String> failedNames)
       throws MetadataException, IOException {
-    DeleteTimeSeriesPlan deleteTimeSeriesPlan = new DeleteTimeSeriesPlan();
     try {
       PartialPath emptyStorageGroup = deleteOneTimeseriesUpdateStatisticsAndDropTrigger(p);
       if (!isRecovering) {
         if (emptyStorageGroup != null) {
           StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(emptyStorageGroup);
         }
-        deleteTimeSeriesPlan.setDeletePathList(Collections.singletonList(p));
-        logWriter.deleteTimeseries(deleteTimeSeriesPlan);
+        logWriter.write(
+            SchemaRegionPlanFactory.getDeleteTimeSeriesPlan(Collections.singletonList(p)));
       }
     } catch (DeleteFailedException e) {
       failedNames.add(e.getName());
@@ -883,18 +859,18 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
     node = mtree.getDeviceNodeWithAutoCreating(path);
     if (!isRecovering) {
-      logWriter.autoCreateDeviceMNode(new AutoCreateDeviceMNodePlan(node.getPartialPath()));
+      logWriter.write(SchemaRegionPlanFactory.getAutoCreateDeviceMNodePlan(node.getPartialPath()));
     }
     return node;
   }
 
   @Override
-  public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
+  public void autoCreateDeviceMNode(IAutoCreateDeviceMNodePlan plan) throws MetadataException {
     IMNode node = mtree.getDeviceNodeWithAutoCreating(plan.getPath());
     mtree.unPinMNode(node);
     if (!isRecovering) {
       try {
-        logWriter.autoCreateDeviceMNode(plan);
+        logWriter.write(plan);
       } catch (IOException e) {
         throw new MetadataException(e);
       }
@@ -1250,28 +1226,6 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     return new Pair<>(res, ans.right);
   }
 
-  /**
-   * Get series type for given seriesPath.
-   *
-   * @param fullPath full path
-   */
-  public TSDataType getSeriesType(PartialPath fullPath) throws MetadataException {
-    if (fullPath.equals(SQLConstant.TIME_PATH)) {
-      return TSDataType.INT64;
-    }
-    return getSeriesSchema(fullPath).getType();
-  }
-
-  /**
-   * Get schema of paritialPath
-   *
-   * @param fullPath (may be ParitialPath or AlignedPath)
-   * @return MeasurementSchema
-   */
-  public IMeasurementSchema getSeriesSchema(PartialPath fullPath) throws MetadataException {
-    return getMeasurementMNode(fullPath).getSchema();
-  }
-
   // attention: this path must be a device node
   @Override
   public List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
@@ -1399,7 +1353,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
     try {
       if (!isRecovering) {
-        logWriter.changeAlias(path, alias);
+        logWriter.write(SchemaRegionPlanFactory.getChangeAliasPlan(path, alias));
       }
     } catch (IOException e) {
       throw new MetadataException(e);
@@ -1435,7 +1389,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       // no tag or attribute, we need to add a new record in log
       if (leafMNode.getOffset() < 0) {
         long offset = tagManager.writeTagFile(tagsMap, attributesMap);
-        logWriter.changeOffset(fullPath, offset);
+        logWriter.write(SchemaRegionPlanFactory.getChangeTagOffsetPlan(fullPath, offset));
         leafMNode.setOffset(offset);
         mtree.updateMNode(leafMNode);
         // update inverted Index map
@@ -1466,7 +1420,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
       mtree.setAlias(leafMNode, alias);
       // persist to WAL
-      logWriter.changeAlias(fullPath, alias);
+      logWriter.write(SchemaRegionPlanFactory.getChangeAliasPlan(fullPath, alias));
     }
   }
 
@@ -1484,7 +1438,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       // no tag or attribute, we need to add a new record in log
       if (leafMNode.getOffset() < 0) {
         long offset = tagManager.writeTagFile(Collections.emptyMap(), attributesMap);
-        logWriter.changeOffset(fullPath, offset);
+        logWriter.write(SchemaRegionPlanFactory.getChangeTagOffsetPlan(fullPath, offset));
         leafMNode.setOffset(offset);
         mtree.updateMNode(leafMNode);
         return;
@@ -1510,7 +1464,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       // no tag or attribute, we need to add a new record in log
       if (leafMNode.getOffset() < 0) {
         long offset = tagManager.writeTagFile(tagsMap, Collections.emptyMap());
-        logWriter.changeOffset(fullPath, offset);
+        logWriter.write(SchemaRegionPlanFactory.getChangeTagOffsetPlan(fullPath, offset));
         leafMNode.setOffset(offset);
         mtree.updateMNode(leafMNode);
         // update inverted Index map
@@ -1865,7 +1819,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   }
 
   @Override
-  public synchronized void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
+  public synchronized void setSchemaTemplate(ISetTemplatePlan plan) throws MetadataException {
     // get mnode and update template should be atomic
     Template template = TemplateManager.getInstance().getTemplate(plan.getTemplateName());
 
@@ -1890,7 +1844,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
       // write wal
       if (!isRecovering) {
-        logWriter.setSchemaTemplate(plan);
+        logWriter.write(plan);
       }
     } catch (IOException e) {
       throw new MetadataException(e);
@@ -1898,7 +1852,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   }
 
   @Override
-  public synchronized void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
+  public synchronized void unsetSchemaTemplate(IUnsetTemplatePlan plan) throws MetadataException {
     // get mnode should be atomic
     try {
       PartialPath path = new PartialPath(plan.getPrefixPath());
@@ -1917,7 +1871,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
           .unmarkSchemaRegion(template, storageGroupFullPath, schemaRegionId);
       // write wal
       if (!isRecovering) {
-        logWriter.unsetSchemaTemplate(plan);
+        logWriter.write(plan);
       }
     } catch (IOException e) {
       throw new MetadataException(e);
@@ -1925,7 +1879,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   }
 
   @Override
-  public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
+  public void setUsingSchemaTemplate(IActivateTemplatePlan plan) throws MetadataException {
     // check whether any template has been set on designated path
     if (mtree.getTemplateOnPath(plan.getPrefixPath()) == null) {
       throw new MetadataException(
@@ -1949,7 +1903,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   }
 
   @Override
-  public void activateSchemaTemplate(ActivateTemplateInClusterPlan plan, Template template)
+  public void activateSchemaTemplate(IActivateTemplateInClusterPlan plan, Template template)
       throws MetadataException {
     throw new UnsupportedOperationException();
   }
@@ -1992,7 +1946,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
     if (!isRecovering) {
       try {
-        logWriter.setUsingSchemaTemplate(node.getPartialPath());
+        logWriter.write(SchemaRegionPlanFactory.getActivateTemplatePlan(node.getPartialPath()));
       } catch (IOException e) {
         throw new MetadataException(e);
       }
@@ -2014,4 +1968,138 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   }
 
   // endregion
+
+  private static class RecoverOperationResult {
+
+    private static final RecoverOperationResult SUCCESS = new RecoverOperationResult(null);
+
+    private final Exception e;
+
+    private RecoverOperationResult(Exception e) {
+      this.e = e;
+    }
+
+    private boolean isFailed() {
+      return e == null;
+    }
+
+    private Exception getException() {
+      return e;
+    }
+  }
+
+  private class RecoverPlanOperator
+      extends SchemaRegionPlanVisitor<RecoverOperationResult, SchemaRegionSchemaFileImpl> {
+
+    @Override
+    public RecoverOperationResult visitSchemaRegionPlan(
+        ISchemaRegionPlan plan, SchemaRegionSchemaFileImpl context) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "SchemaRegionPlan of type %s doesn't support recover operation in SchemaRegionSchemaFileImpl.",
+              plan.getPlanType().name()));
+    }
+
+    @Override
+    public RecoverOperationResult visitCreateTimeSeries(
+        ICreateTimeSeriesPlan createTimeSeriesPlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        recoverTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitCreateAlignedTimeSeries(
+        ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan,
+        SchemaRegionSchemaFileImpl context) {
+      try {
+        recoverAlignedTimeSeries(createAlignedTimeSeriesPlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitDeleteTimeSeries(
+        IDeleteTimeSeriesPlan deleteTimeSeriesPlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        // since we only has one path for one DeleteTimeSeriesPlan
+        deleteOneTimeseriesUpdateStatisticsAndDropTrigger(
+            deleteTimeSeriesPlan.getDeletePathList().get(0));
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException | IOException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitChangeAlias(
+        IChangeAliasPlan changeAliasPlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitChangeTagOffset(
+        IChangeTagOffsetPlan changeTagOffsetPlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitAutoCreateDeviceMNode(
+        IAutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitSetTemplate(
+        ISetTemplatePlan setTemplatePlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        setSchemaTemplate(setTemplatePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitUnsetTemplate(
+        IUnsetTemplatePlan unsetTemplatePlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        unsetSchemaTemplate(unsetTemplatePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+
+    @Override
+    public RecoverOperationResult visitActivateTemplate(
+        IActivateTemplatePlan activateTemplatePlan, SchemaRegionSchemaFileImpl context) {
+      try {
+        setUsingSchemaTemplate(activateTemplatePlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index c6d1b51af6..cf3ab351f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -25,6 +25,9 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanFactory;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -40,13 +43,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -66,8 +64,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
   @Override
   public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
     try {
-      PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
-      schemaRegion.createTimeseries((CreateTimeSeriesPlan) plan, -1);
+      schemaRegion.createTimeseries(node, -1);
     } catch (MetadataException e) {
       logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
@@ -79,8 +76,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
   public TSStatus visitCreateAlignedTimeSeries(
       CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
     try {
-      PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
-      schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+      schemaRegion.createAlignedTimeSeries(node);
     } catch (MetadataException e) {
       logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
@@ -118,9 +114,9 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
   }
 
-  private CreateTimeSeriesPlan transformToCreateTimeSeriesPlan(
+  private ICreateTimeSeriesPlan transformToCreateTimeSeriesPlan(
       PartialPath devicePath, MeasurementGroup measurementGroup, int index) {
-    return new CreateTimeSeriesPlan(
+    return SchemaRegionPlanFactory.getCreateTimeSeriesPlan(
         devicePath.concatNode(measurementGroup.getMeasurements().get(index)),
         measurementGroup.getDataTypes().get(index),
         measurementGroup.getEncodings().get(index),
@@ -198,8 +194,8 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
     List<TSDataType> dataTypeList = measurementGroup.getDataTypes();
     List<TSEncoding> encodingList = measurementGroup.getEncodings();
     List<CompressionType> compressionTypeList = measurementGroup.getCompressors();
-    CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
-        new CreateAlignedTimeSeriesPlan(
+    ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
+        SchemaRegionPlanFactory.getCreateAlignedTimeSeriesPlan(
             devicePath,
             measurementList,
             dataTypeList,
@@ -280,12 +276,9 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
   @Override
   public TSStatus visitActivateTemplate(ActivateTemplateNode node, ISchemaRegion schemaRegion) {
     try {
-      ActivateTemplateInClusterPlan plan =
-          (ActivateTemplateInClusterPlan)
-              new PhysicalPlanTransformer().visitActivateTemplate(node, new TransformerContext());
       Template template = ClusterTemplateManager.getInstance().getTemplate(node.getTemplateId());
-      plan.setAligned(template.isDirectAligned());
-      schemaRegion.activateSchemaTemplate(plan, template);
+      node.setAligned(template.isDirectAligned());
+      schemaRegion.activateSchemaTemplate(node, template);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (MetadataException e) {
       logger.error(e.getMessage(), e);
@@ -332,48 +325,4 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
   public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
     return null;
   }
-
-  // TODO need remove
-  private static class PhysicalPlanTransformer
-      extends PlanVisitor<PhysicalPlan, TransformerContext> {
-    @Override
-    public PhysicalPlan visitPlan(PlanNode node, TransformerContext context) {
-      throw new NotImplementedException();
-    }
-
-    public PhysicalPlan visitCreateTimeSeries(
-        CreateTimeSeriesNode node, TransformerContext context) {
-      return new CreateTimeSeriesPlan(
-          node.getPath(),
-          node.getDataType(),
-          node.getEncoding(),
-          node.getCompressor(),
-          node.getProps(),
-          node.getTags(),
-          node.getAttributes(),
-          node.getAlias());
-    }
-
-    public PhysicalPlan visitCreateAlignedTimeSeries(
-        CreateAlignedTimeSeriesNode node, TransformerContext context) {
-      return new CreateAlignedTimeSeriesPlan(
-          node.getDevicePath(),
-          node.getMeasurements(),
-          node.getDataTypes(),
-          node.getEncodings(),
-          node.getCompressors(),
-          node.getAliasList(),
-          node.getTagsList(),
-          node.getAttributesList());
-    }
-
-    @Override
-    public PhysicalPlan visitActivateTemplate(
-        ActivateTemplateNode node, TransformerContext context) {
-      return new ActivateTemplateInClusterPlan(
-          node.getActivatePath(), node.getTemplateSetLevel(), node.getTemplateId());
-    }
-  }
-
-  private static class TransformerContext {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
index 730836e383..4d7bf8acf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -38,12 +39,14 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class ActivateTemplateNode extends WritePlanNode {
+public class ActivateTemplateNode extends WritePlanNode implements IActivateTemplateInClusterPlan {
 
   private PartialPath activatePath;
   private int templateSetLevel;
   private int templateId;
 
+  private boolean isAligned;
+
   private TRegionReplicaSet regionReplicaSet;
 
   public ActivateTemplateNode(
@@ -78,6 +81,16 @@ public class ActivateTemplateNode extends WritePlanNode {
     this.templateId = templateId;
   }
 
+  @Override
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  @Override
+  public void setAligned(boolean aligned) {
+    this.isAligned = aligned;
+  }
+
   @Override
   public TRegionReplicaSet getRegionReplicaSet() {
     return regionReplicaSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 207da2f653..d3891e8b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -44,7 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class CreateAlignedTimeSeriesNode extends WritePlanNode {
+public class CreateAlignedTimeSeriesNode extends WritePlanNode
+    implements ICreateAlignedTimeSeriesPlan {
   private PartialPath devicePath;
   private List<String> measurements;
   private List<TSDataType> dataTypes;
@@ -53,6 +55,11 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
   private List<String> aliasList;
   private List<Map<String, String>> tagsList;
   private List<Map<String, String>> attributesList;
+
+  // only used inside schemaRegion to be serialized to mlog, no need to be serialized for mpp
+  // transport
+  private List<Long> tagOffsets = null;
+
   private TRegionReplicaSet regionReplicaSet;
 
   public CreateAlignedTimeSeriesNode(
@@ -140,6 +147,22 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
     this.attributesList = attributesList;
   }
 
+  @Override
+  public List<Long> getTagOffsets() {
+    if (tagOffsets == null) {
+      tagOffsets = new ArrayList<>();
+      for (int i = 0; i < measurements.size(); i++) {
+        tagOffsets.add(Long.parseLong("-1"));
+      }
+    }
+    return tagOffsets;
+  }
+
+  @Override
+  public void setTagOffsets(List<Long> tagOffsets) {
+    this.tagOffsets = tagOffsets;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 608c8550c9..0880ec2676 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -45,7 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class CreateTimeSeriesNode extends WritePlanNode {
+public class CreateTimeSeriesNode extends WritePlanNode implements ICreateTimeSeriesPlan {
   private PartialPath path;
   private TSDataType dataType;
   private TSEncoding encoding;
@@ -55,6 +56,10 @@ public class CreateTimeSeriesNode extends WritePlanNode {
   private Map<String, String> tags = null;
   private Map<String, String> attributes = null;
 
+  // only used inside schemaRegion to be serialized to mlog, no need to be serialized for mpp
+  // transport
+  private long tagOffset = -1;
+
   private TRegionReplicaSet regionReplicaSet;
 
   public CreateTimeSeriesNode(
@@ -145,6 +150,16 @@ public class CreateTimeSeriesNode extends WritePlanNode {
     this.props = props;
   }
 
+  @Override
+  public long getTagOffset() {
+    return tagOffset;
+  }
+
+  @Override
+  public void setTagOffset(long tagOffset) {
+    this.tagOffset = tagOffset;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplateInClusterPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplateInClusterPlan.java
index 98d5cfab9c..5b6484ba6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplateInClusterPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplateInClusterPlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -32,7 +33,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-public class ActivateTemplateInClusterPlan extends PhysicalPlan {
+public class ActivateTemplateInClusterPlan extends PhysicalPlan
+    implements IActivateTemplateInClusterPlan {
 
   private PartialPath activatePath;
   private int templateSetLevel;
@@ -54,6 +56,11 @@ public class ActivateTemplateInClusterPlan extends PhysicalPlan {
     return activatePath;
   }
 
+  @Override
+  public void setActivatePath(PartialPath activatePath) {
+    this.activatePath = activatePath;
+  }
+
   public PartialPath getPathSetTemplate() {
     return new PartialPath(Arrays.copyOf(activatePath.getNodes(), templateSetLevel + 1));
   }
@@ -62,10 +69,20 @@ public class ActivateTemplateInClusterPlan extends PhysicalPlan {
     return templateId;
   }
 
+  @Override
+  public void setTemplateId(int templateSetLevel) {
+    this.templateSetLevel = templateSetLevel;
+  }
+
   public int getTemplateSetLevel() {
     return templateSetLevel;
   }
 
+  @Override
+  public void setTemplateSetLevel(int templateId) {
+    this.templateId = templateId;
+  }
+
   public boolean isAligned() {
     return isAligned;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplatePlan.java
index 467ce34528..6c893a985d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplatePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplatePlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -33,7 +34,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class ActivateTemplatePlan extends PhysicalPlan {
+public class ActivateTemplatePlan extends PhysicalPlan implements IActivateTemplatePlan {
 
   private static final Logger logger = LoggerFactory.getLogger(ActivateTemplatePlan.class);
   PartialPath prefixPath;
@@ -56,6 +57,11 @@ public class ActivateTemplatePlan extends PhysicalPlan {
     return prefixPath;
   }
 
+  @Override
+  public void setPrefixPath(PartialPath prefixPath) {
+    this.prefixPath = prefixPath;
+  }
+
   @Override
   public void serializeImpl(ByteBuffer buffer) {
     buffer.put((byte) PhysicalPlanType.ACTIVATE_TEMPLATE.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java
index 7a67badbb3..cb8799ac2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
@@ -33,7 +34,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
-public class AutoCreateDeviceMNodePlan extends PhysicalPlan {
+public class AutoCreateDeviceMNodePlan extends PhysicalPlan implements IAutoCreateDeviceMNodePlan {
 
   private static final Logger logger = LoggerFactory.getLogger(AutoCreateDeviceMNodePlan.class);
   protected PartialPath path;
@@ -56,6 +57,11 @@ public class AutoCreateDeviceMNodePlan extends PhysicalPlan {
     return path;
   }
 
+  @Override
+  public void setPath(PartialPath path) {
+    this.path = path;
+  }
+
   @Override
   public void serializeImpl(ByteBuffer buffer) {
     buffer.put((byte) PhysicalPlanType.AUTO_CREATE_DEVICE_MNODE.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
index c44b619bb8..4a8aa2223d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
@@ -31,7 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class ChangeAliasPlan extends PhysicalPlan {
+public class ChangeAliasPlan extends PhysicalPlan implements IChangeAliasPlan {
   private PartialPath path;
   private String alias;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
index dee407c2b5..7d0114b3e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
@@ -31,7 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class ChangeTagOffsetPlan extends PhysicalPlan {
+public class ChangeTagOffsetPlan extends PhysicalPlan implements IChangeTagOffsetPlan {
   private PartialPath path;
   private long offset;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java
index a8024c5504..99af3ad2a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateAlignedTimeSeriesPlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -41,12 +42,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
+public class CreateAlignedTimeSeriesPlan extends PhysicalPlan
+    implements ICreateAlignedTimeSeriesPlan {
 
   private static final Logger logger = LoggerFactory.getLogger(CreateAlignedTimeSeriesPlan.class);
   private static final int PLAN_SINCE_0_14 = -1;
 
-  private PartialPath prefixPath;
+  private PartialPath devicePath;
   private List<String> measurements;
   private List<TSDataType> dataTypes;
   private List<TSEncoding> encodings;
@@ -62,7 +64,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
   }
 
   public CreateAlignedTimeSeriesPlan(
-      PartialPath prefixPath,
+      PartialPath devicePath,
       List<String> measurements,
       List<TSDataType> dataTypes,
       List<TSEncoding> encodings,
@@ -71,7 +73,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
       List<Map<String, String>> tagsList,
       List<Map<String, String>> attributesList) {
     super(Operator.OperatorType.CREATE_ALIGNED_TIMESERIES);
-    this.prefixPath = prefixPath;
+    this.devicePath = devicePath;
     this.measurements = measurements;
     this.dataTypes = dataTypes;
     this.encodings = encodings;
@@ -83,9 +85,9 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
   }
 
   public CreateAlignedTimeSeriesPlan(
-      PartialPath prefixPath, String measurement, MeasurementSchema schema) {
+      PartialPath devicePath, String measurement, MeasurementSchema schema) {
     super(Operator.OperatorType.CREATE_ALIGNED_TIMESERIES);
-    this.prefixPath = prefixPath;
+    this.devicePath = devicePath;
     this.measurements = Collections.singletonList(measurement);
     this.dataTypes = Collections.singletonList(schema.getType());
     this.encodings = Collections.singletonList(schema.getEncodingType());
@@ -93,12 +95,12 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
     this.canBeSplit = false;
   }
 
-  public PartialPath getPrefixPath() {
-    return prefixPath;
+  public PartialPath getDevicePath() {
+    return devicePath;
   }
 
-  public void setPrefixPath(PartialPath prefixPath) {
-    this.prefixPath = prefixPath;
+  public void setDevicePath(PartialPath devicePath) {
+    this.devicePath = devicePath;
   }
 
   public List<String> getMeasurements() {
@@ -175,7 +177,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
   public String toString() {
     return String.format(
         "devicePath: %s, measurements: %s, dataTypes: %s, encodings: %s, compressions: %s, tagOffsets: %s",
-        prefixPath, measurements, dataTypes, encodings, compressors, tagOffsets);
+        devicePath, measurements, dataTypes, encodings, compressors, tagOffsets);
   }
 
   @Override
@@ -183,7 +185,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
     List<PartialPath> paths = new ArrayList<>();
     for (String measurement : measurements) {
       try {
-        paths.add(new PartialPath(prefixPath.getFullPath(), measurement));
+        paths.add(new PartialPath(devicePath.getFullPath(), measurement));
       } catch (IllegalPathException e) {
         logger.error("Failed to get paths of CreateAlignedTimeSeriesPlan. ", e);
       }
@@ -198,7 +200,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
     // distinguish the plan from that of old versions
     stream.writeInt(PLAN_SINCE_0_14);
 
-    byte[] bytes = prefixPath.getFullPath().getBytes();
+    byte[] bytes = devicePath.getFullPath().getBytes();
     stream.writeInt(bytes.length);
     stream.write(bytes);
 
@@ -265,7 +267,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
     // distinguish the plan from that of old versions
     buffer.putInt(PLAN_SINCE_0_14);
 
-    byte[] bytes = prefixPath.getFullPath().getBytes();
+    byte[] bytes = devicePath.getFullPath().getBytes();
     buffer.putInt(bytes.length);
     buffer.put(bytes);
 
@@ -322,7 +324,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
   public void formerSerialize(ByteBuffer buffer) {
     buffer.put((byte) PhysicalPlanType.CREATE_ALIGNED_TIMESERIES.ordinal());
 
-    byte[] bytes = prefixPath.getFullPath().getBytes();
+    byte[] bytes = devicePath.getFullPath().getBytes();
     buffer.putInt(bytes.length);
     buffer.put(bytes);
 
@@ -365,7 +367,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
     byte[] bytes = new byte[length];
     buffer.get(bytes);
 
-    prefixPath = new PartialPath(new String(bytes));
+    devicePath = new PartialPath(new String(bytes));
     int size = ReadWriteIOUtils.readInt(buffer);
     measurements = new ArrayList<>();
     for (int i = 0; i < size; i++) {
@@ -429,7 +431,7 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
     }
     CreateAlignedTimeSeriesPlan that = (CreateAlignedTimeSeriesPlan) o;
 
-    return Objects.equals(prefixPath, that.prefixPath)
+    return Objects.equals(devicePath, that.devicePath)
         && Objects.equals(measurements, that.measurements)
         && Objects.equals(dataTypes, that.dataTypes)
         && Objects.equals(encodings, that.encodings)
@@ -439,6 +441,6 @@ public class CreateAlignedTimeSeriesPlan extends PhysicalPlan {
 
   @Override
   public int hashCode() {
-    return Objects.hash(prefixPath, measurements, dataTypes, encodings, compressors, tagOffsets);
+    return Objects.hash(devicePath, measurements, dataTypes, encodings, compressors, tagOffsets);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
index 6d0c902c8b..c54d29d95f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -37,7 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 
-public class CreateTimeSeriesPlan extends PhysicalPlan {
+public class CreateTimeSeriesPlan extends PhysicalPlan implements ICreateTimeSeriesPlan {
 
   private PartialPath path;
   private TSDataType dataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 19f4284046..e1fa98fc1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion.TimePartitionFilter;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
@@ -34,7 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class DeleteTimeSeriesPlan extends PhysicalPlan {
+public class DeleteTimeSeriesPlan extends PhysicalPlan implements IDeleteTimeSeriesPlan {
 
   private List<PartialPath> deletePathList;
   private Map<Integer, TSStatus> results = new TreeMap<>();
@@ -59,6 +60,11 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
     return deletePathList;
   }
 
+  @Override
+  public List<PartialPath> getDeletePathList() {
+    return deletePathList;
+  }
+
   public void setDeletePathList(List<PartialPath> deletePathList) {
     this.deletePathList = deletePathList;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PreDeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PreDeleteTimeSeriesPlan.java
index 0b2737a23b..ecd92e1035 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PreDeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/PreDeleteTimeSeriesPlan.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
@@ -31,7 +32,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
-public class PreDeleteTimeSeriesPlan extends PhysicalPlan {
+public class PreDeleteTimeSeriesPlan extends PhysicalPlan implements IPreDeleteTimeSeriesPlan {
 
   private PartialPath path;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/RollbackPreDeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/RollbackPreDeleteTimeSeriesPlan.java
index 9677dfdfbc..b8def0a012 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/RollbackPreDeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/RollbackPreDeleteTimeSeriesPlan.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.qp.physical.sys;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
@@ -31,7 +32,8 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
-public class RollbackPreDeleteTimeSeriesPlan extends PhysicalPlan {
+public class RollbackPreDeleteTimeSeriesPlan extends PhysicalPlan
+    implements IRollbackPreDeleteTimeSeriesPlan {
 
   private PartialPath path;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTemplatePlan.java
index 5ab66113e8..6a216ac323 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTemplatePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTemplatePlan.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -32,7 +33,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class SetTemplatePlan extends PhysicalPlan {
+public class SetTemplatePlan extends PhysicalPlan implements ISetTemplatePlan {
   String templateName;
   String prefixPath;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/UnsetTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/UnsetTemplatePlan.java
index f10560801e..252790d4a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/UnsetTemplatePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/UnsetTemplatePlan.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -33,7 +34,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class UnsetTemplatePlan extends PhysicalPlan {
+public class UnsetTemplatePlan extends PhysicalPlan implements IUnsetTemplatePlan {
 
   String prefixPath;
   String templateName;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/schema/MLogParser.java b/server/src/main/java/org/apache/iotdb/db/tools/schema/MLogParser.java
index b1f2a60431..18b15a95ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/schema/MLogParser.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/schema/MLogParser.java
@@ -18,29 +18,13 @@
  */
 package org.apache.iotdb.db.tools.schema;
 
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.metadata.logfile.MLogReader;
-import org.apache.iotdb.db.metadata.logfile.MLogTxtWriter;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
-import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
-import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
-import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.metadata.logfile.BufferedSerializer;
+import org.apache.iotdb.db.metadata.logfile.FakeCRC32Deserializer;
+import org.apache.iotdb.db.metadata.logfile.SchemaLogReader;
+import org.apache.iotdb.db.metadata.logfile.SchemaLogWriter;
+import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanTxtSerializer;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -162,90 +146,18 @@ public class MLogParser {
   }
 
   public static void parseFromFile(String inputFile, String outputFile) throws IOException {
-    try (MLogReader mLogReader = new MLogReader(inputFile);
-        MLogTxtWriter mLogTxtWriter = new MLogTxtWriter(outputFile)) {
-
+    try (SchemaLogReader<ISchemaRegionPlan> mLogReader =
+            new SchemaLogReader<>(
+                inputFile, new FakeCRC32Deserializer<>(new SchemaRegionPlanDeserializer()));
+        SchemaLogWriter<ISchemaRegionPlan> mLogTxtWriter =
+            new SchemaLogWriter<>(
+                outputFile, new BufferedSerializer<>(new SchemaRegionPlanTxtSerializer()), false)) {
+      ISchemaRegionPlan plan;
       while (mLogReader.hasNext()) {
-        PhysicalPlan plan = mLogReader.next();
-        switch (plan.getOperatorType()) {
-          case CREATE_TIMESERIES:
-            mLogTxtWriter.createTimeseries(
-                (CreateTimeSeriesPlan) plan, ((CreateTimeSeriesPlan) plan).getTagOffset());
-            break;
-          case CREATE_ALIGNED_TIMESERIES:
-            mLogTxtWriter.createAlignedTimeseries((CreateAlignedTimeSeriesPlan) plan);
-            break;
-          case DELETE_TIMESERIES:
-            for (PartialPath partialPath : plan.getPaths()) {
-              mLogTxtWriter.deleteTimeseries(partialPath.getFullPath());
-            }
-            break;
-          case SET_STORAGE_GROUP:
-            mLogTxtWriter.setStorageGroup(((SetStorageGroupPlan) plan).getPath().getFullPath());
-            break;
-          case DELETE_STORAGE_GROUP:
-            for (PartialPath partialPath : plan.getPaths()) {
-              mLogTxtWriter.deleteStorageGroup(partialPath.getFullPath());
-            }
-            break;
-          case TTL:
-            mLogTxtWriter.setTTL(
-                ((SetTTLPlan) plan).getStorageGroup().getFullPath(),
-                ((SetTTLPlan) plan).getDataTTL());
-            break;
-          case CHANGE_ALIAS:
-            mLogTxtWriter.changeAlias(
-                ((ChangeAliasPlan) plan).getPath().getFullPath(),
-                ((ChangeAliasPlan) plan).getAlias());
-            break;
-          case CHANGE_TAG_OFFSET:
-            mLogTxtWriter.changeOffset(
-                ((ChangeTagOffsetPlan) plan).getPath().getFullPath(),
-                ((ChangeTagOffsetPlan) plan).getOffset());
-            break;
-          case MEASUREMENT_MNODE:
-            mLogTxtWriter.serializeMeasurementMNode((MeasurementMNodePlan) plan);
-            break;
-          case STORAGE_GROUP_MNODE:
-            mLogTxtWriter.serializeStorageGroupMNode((StorageGroupMNodePlan) plan);
-            break;
-          case MNODE:
-            mLogTxtWriter.serializeMNode((MNodePlan) plan);
-            break;
-          case CREATE_CONTINUOUS_QUERY:
-            mLogTxtWriter.createContinuousQuery((CreateContinuousQueryPlan) plan);
-            break;
-          case DROP_CONTINUOUS_QUERY:
-            mLogTxtWriter.dropContinuousQuery((DropContinuousQueryPlan) plan);
-            break;
-          case CREATE_TEMPLATE:
-            mLogTxtWriter.createSchemaTemplate((CreateTemplatePlan) plan);
-            break;
-          case APPEND_TEMPLATE:
-            mLogTxtWriter.appendTemplate((AppendTemplatePlan) plan);
-            break;
-          case PRUNE_TEMPLATE:
-            mLogTxtWriter.pruneTemplate((PruneTemplatePlan) plan);
-          case SET_TEMPLATE:
-            mLogTxtWriter.setTemplate((SetTemplatePlan) plan);
-            break;
-          case UNSET_TEMPLATE:
-            mLogTxtWriter.unsetTemplate((UnsetTemplatePlan) plan);
-            break;
-          case DROP_TEMPLATE:
-            mLogTxtWriter.dropTemplate((DropTemplatePlan) plan);
-            break;
-          case ACTIVATE_TEMPLATE:
-            mLogTxtWriter.setUsingTemplate((ActivateTemplatePlan) plan);
-            break;
-          case AUTO_CREATE_DEVICE_MNODE:
-            mLogTxtWriter.autoCreateDeviceNode(
-                ((AutoCreateDeviceMNodePlan) plan).getPath().getFullPath());
-            break;
-          default:
-            logger.warn("unknown plan {}", plan);
-        }
+        plan = mLogReader.next();
+        mLogTxtWriter.write(plan);
       }
+      mLogTxtWriter.force();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/plan/SchemaRegionPlanCompatibilityTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/plan/SchemaRegionPlanCompatibilityTest.java
new file mode 100644
index 0000000000..708a890fd9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/plan/SchemaRegionPlanCompatibilityTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.plan;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanType;
+import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IActivateTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IAutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeAliasPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IChangeTagOffsetPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IRollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.ISetTemplatePlan;
+import org.apache.iotdb.db.metadata.plan.schemaregion.write.IUnsetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.PreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.RollbackPreDeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SchemaRegionPlanCompatibilityTest {
+
+  private static final SchemaRegionPlanDeserializer DESERIALIZER =
+      new SchemaRegionPlanDeserializer();
+  private static final ByteBuffer BUFFER = ByteBuffer.allocate(1024);
+
+  private static int serializeToBuffer(PhysicalPlan oldPlan) {
+    BUFFER.clear();
+    oldPlan.serialize(BUFFER);
+    int position = BUFFER.position();
+    BUFFER.flip();
+    return position;
+  }
+
+  private static <T> T deserializeFromBuffer() {
+    return (T) DESERIALIZER.deserialize(BUFFER);
+  }
+
+  private static int getCurrentBufferPosition() {
+    return BUFFER.position();
+  }
+
+  @Test
+  public void testPlanTypeCompatibility() {
+    for (SchemaRegionPlanType schemaRegionPlanType : SchemaRegionPlanType.values()) {
+      Assert.assertEquals(
+          schemaRegionPlanType.getPlanType(),
+          PhysicalPlan.PhysicalPlanType.valueOf(schemaRegionPlanType.name()).ordinal());
+    }
+  }
+
+  @Test
+  public void testActivateTemplateInClusterPlanSerializationCompatibility()
+      throws IllegalPathException {
+    ActivateTemplateInClusterPlan oldPlan = new ActivateTemplateInClusterPlan();
+    oldPlan.setActivatePath(new PartialPath("root.sg.d"));
+    oldPlan.setTemplateSetLevel(1);
+    oldPlan.setAligned(true);
+    oldPlan.setTemplateId(1);
+
+    int position = serializeToBuffer(oldPlan);
+
+    IActivateTemplateInClusterPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getActivatePath(), newPlan.getActivatePath());
+    Assert.assertEquals(oldPlan.getTemplateSetLevel(), newPlan.getTemplateSetLevel());
+    Assert.assertEquals(oldPlan.getTemplateId(), newPlan.getTemplateId());
+    Assert.assertEquals(oldPlan.isAligned(), newPlan.isAligned());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testActivateTemplatePlanSerializationCompatibility() throws IllegalPathException {
+    ActivateTemplatePlan oldPlan = new ActivateTemplatePlan();
+    oldPlan.setPrefixPath(new PartialPath("root.sg.d"));
+
+    int position = serializeToBuffer(oldPlan);
+
+    IActivateTemplatePlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPrefixPath(), newPlan.getPrefixPath());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testAutoCreateDeviceMNodePlanSerializationCompatibility()
+      throws IllegalPathException {
+    AutoCreateDeviceMNodePlan oldPlan = new AutoCreateDeviceMNodePlan();
+    oldPlan.setPath(new PartialPath("root.sg.d"));
+
+    int position = serializeToBuffer(oldPlan);
+
+    IAutoCreateDeviceMNodePlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPath(), newPlan.getPath());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testChangeAliasPlanSerializationCompatibility() throws IllegalPathException {
+    ChangeAliasPlan oldPlan = new ChangeAliasPlan();
+    oldPlan.setPath(new PartialPath("root.sg.d.s"));
+    oldPlan.setAlias("alias");
+
+    int position = serializeToBuffer(oldPlan);
+
+    IChangeAliasPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPath(), newPlan.getPath());
+    Assert.assertEquals(oldPlan.getAlias(), newPlan.getAlias());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testChangeTagOffsetPlanSerializationCompatibility() throws IllegalPathException {
+    ChangeTagOffsetPlan oldPlan = new ChangeTagOffsetPlan();
+    oldPlan.setPath(new PartialPath("root.sg.d.s"));
+    oldPlan.setOffset(10);
+
+    int position = serializeToBuffer(oldPlan);
+
+    IChangeTagOffsetPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPath(), newPlan.getPath());
+    Assert.assertEquals(oldPlan.getOffset(), newPlan.getOffset());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testCreateAlignedTimeSeriesPlanSerializationCompatibility()
+      throws IllegalPathException {
+    CreateAlignedTimeSeriesPlan oldPlan = new CreateAlignedTimeSeriesPlan();
+    oldPlan.setDevicePath(new PartialPath("root.sg.d"));
+    oldPlan.setMeasurements(Arrays.asList("s1", "s2"));
+    oldPlan.setDataTypes(Arrays.asList(TSDataType.INT32, TSDataType.FLOAT));
+    oldPlan.setEncodings(Arrays.asList(TSEncoding.GORILLA, TSEncoding.BITMAP));
+    oldPlan.setCompressors(Arrays.asList(CompressionType.PLA, CompressionType.GZIP));
+    oldPlan.setAliasList(Arrays.asList("status", "temperature"));
+    Map<String, String> tagMap = new HashMap<>();
+    tagMap.put("tag-key", "tag-value");
+    oldPlan.setTagsList(Arrays.asList(null, tagMap));
+    Map<String, String> attributeMap = new HashMap<>();
+    attributeMap.put("attribute-key", "attribute-value");
+    oldPlan.setAttributesList(Arrays.asList(attributeMap, null));
+    oldPlan.setTagOffsets(Arrays.asList(10L, 20L));
+
+    int position = serializeToBuffer(oldPlan);
+
+    ICreateAlignedTimeSeriesPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getDevicePath(), newPlan.getDevicePath());
+    Assert.assertEquals(oldPlan.getMeasurements(), newPlan.getMeasurements());
+    Assert.assertEquals(oldPlan.getDataTypes(), newPlan.getDataTypes());
+    Assert.assertEquals(oldPlan.getEncodings(), newPlan.getEncodings());
+    Assert.assertEquals(oldPlan.getCompressors(), newPlan.getCompressors());
+    Assert.assertEquals(oldPlan.getAliasList(), newPlan.getAliasList());
+    Assert.assertEquals(oldPlan.getTagsList(), newPlan.getTagsList());
+    Assert.assertEquals(oldPlan.getAttributesList(), newPlan.getAttributesList());
+    Assert.assertEquals(oldPlan.getTagOffsets(), newPlan.getTagOffsets());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testCreateTimeSeriesPlanSerializationCompatibility() throws IllegalPathException {
+    CreateTimeSeriesPlan oldPlan = new CreateTimeSeriesPlan();
+    oldPlan.setPath(new PartialPath("root.sg.d.s"));
+    oldPlan.setDataType(TSDataType.DOUBLE);
+    oldPlan.setEncoding(TSEncoding.FREQ);
+    oldPlan.setCompressor(CompressionType.UNCOMPRESSED);
+    oldPlan.setAlias(null);
+    Map<String, String> tagMap = new HashMap<>();
+    tagMap.put("tag-key", "tag-value");
+    oldPlan.setTags(tagMap);
+    Map<String, String> attributeMap = new HashMap<>();
+    attributeMap.put("attribute-key", "attribute-value");
+    oldPlan.setAttributes(attributeMap);
+    oldPlan.setTagOffset(30L);
+
+    int position = serializeToBuffer(oldPlan);
+
+    ICreateTimeSeriesPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPath(), newPlan.getPath());
+    Assert.assertEquals(oldPlan.getDataType(), newPlan.getDataType());
+    Assert.assertEquals(oldPlan.getEncoding(), newPlan.getEncoding());
+    Assert.assertEquals(oldPlan.getCompressor(), newPlan.getCompressor());
+    Assert.assertEquals(oldPlan.getAlias(), newPlan.getAlias());
+    Assert.assertEquals(oldPlan.getTags(), newPlan.getTags());
+    Assert.assertEquals(oldPlan.getAttributes(), newPlan.getAttributes());
+    Assert.assertEquals(oldPlan.getTagOffset(), newPlan.getTagOffset());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testDeleteTimeSeriesPlanSerializationCompatibility() throws IllegalPathException {
+    DeleteTimeSeriesPlan oldPlan = new DeleteTimeSeriesPlan();
+    oldPlan.setDeletePathList(
+        Arrays.asList(new PartialPath("root.sg.*.s"), new PartialPath("root.**.d.s")));
+
+    int position = serializeToBuffer(oldPlan);
+
+    IDeleteTimeSeriesPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getDeletePathList(), newPlan.getDeletePathList());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testPreDeleteTimeSeriesSerializationCompatibility() throws IllegalPathException {
+    PreDeleteTimeSeriesPlan oldPlan = new PreDeleteTimeSeriesPlan();
+    oldPlan.setPath(new PartialPath("root.**.s"));
+
+    int position = serializeToBuffer(oldPlan);
+
+    IPreDeleteTimeSeriesPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPath(), newPlan.getPath());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testRollbackPreDeleteTimeSeriesSerializationCompatibility()
+      throws IllegalPathException {
+    RollbackPreDeleteTimeSeriesPlan oldPlan = new RollbackPreDeleteTimeSeriesPlan();
+    oldPlan.setPath(new PartialPath("root.sg.**"));
+
+    int position = serializeToBuffer(oldPlan);
+
+    IRollbackPreDeleteTimeSeriesPlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPath(), newPlan.getPath());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testSetTemplatePlanSerializationCompatibility() {
+    SetTemplatePlan oldPlan = new SetTemplatePlan();
+    oldPlan.setPrefixPath("root.sg");
+    oldPlan.setTemplateName("template");
+
+    int position = serializeToBuffer(oldPlan);
+
+    ISetTemplatePlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPrefixPath(), newPlan.getPrefixPath());
+    Assert.assertEquals(oldPlan.getTemplateName(), newPlan.getTemplateName());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+
+  @Test
+  public void testUnsetTemplatePlanSerializationCompatibility() {
+    UnsetTemplatePlan oldPlan = new UnsetTemplatePlan();
+    oldPlan.setPrefixPath("root.sg");
+    oldPlan.setTemplateName("template");
+
+    int position = serializeToBuffer(oldPlan);
+
+    IUnsetTemplatePlan newPlan = deserializeFromBuffer();
+
+    Assert.assertEquals(oldPlan.getPrefixPath(), newPlan.getPrefixPath());
+    Assert.assertEquals(oldPlan.getTemplateName(), newPlan.getTemplateName());
+
+    Assert.assertEquals(position, getCurrentBufferPosition());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java b/server/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java
index e854db8479..730b9ac67c 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/MLogParserTest.java
@@ -162,13 +162,9 @@ public class MLogParserTest {
 
     IoTDB.schemaProcessor.forceMlog();
 
-    testParseStorageGroupLog();
-
     for (int i = 0; i < storageGroups.length; i++) {
       testParseMLog(storageGroups[i], storageGroupIndex[i], mlogLineNum[i]);
     }
-
-    testParseTemplateLogFile();
   }
 
   private void testNonExistingStorageGroupDir(String storageGroup) {
@@ -180,19 +176,6 @@ public class MLogParserTest {
     Assert.assertFalse(storageGroupDir.exists());
   }
 
-  private void testParseStorageGroupLog() throws IOException {
-    testParseLog(config.getSchemaDir() + File.separator + MetadataConstant.STORAGE_GROUP_LOG, 7);
-  }
-
-  private void testParseTemplateLogFile() throws IOException {
-
-    testParseLog(
-        IoTDBDescriptor.getInstance().getConfig().getSchemaDir()
-            + File.separator
-            + MetadataConstant.TEMPLATE_FILE,
-        1);
-  }
-
   private void testParseMLog(String storageGroup, int storageGroupId, int expectedLineNum)
       throws IOException {
     testParseLog(