You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/02 03:10:13 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5399] Implement batch auto create schema (#8936)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new bce665cd8b [To rel/1.0][IOTDB-5399] Implement batch auto create schema (#8936)
bce665cd8b is described below

commit bce665cd8b36807f05341d700c604b3bb7892c53
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu Feb 2 11:10:07 2023 +0800

    [To rel/1.0][IOTDB-5399] Implement batch auto create schema (#8936)
---
 .../iotdb/db/client/DataNodeInternalClient.java    |   4 +-
 .../metadata/visitor/SchemaExecutionVisitor.java   |  61 +++
 .../execution/executor/RegionWriteExecutor.java    | 139 ++++-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  44 ++
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |   2 +
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 599 ---------------------
 .../mpp/plan/analyze/StandaloneSchemaFetcher.java  |   1 +
 .../analyze/schema/AutoCreateSchemaExecutor.java   | 418 ++++++++++++++
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 211 ++++++++
 .../plan/analyze/schema/ClusterSchemaFetcher.java  | 363 +++++++++++++
 .../plan/analyze/{ => schema}/ISchemaFetcher.java  |   2 +-
 .../plan/analyze/{ => schema}/SchemaValidator.java |   3 +-
 .../iotdb/db/mpp/plan/constant/StatementType.java  |   3 +
 .../db/mpp/plan/execution/QueryExecution.java      |   2 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  32 ++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  10 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  10 +
 .../write/InternalBatchActivateTemplateNode.java   | 162 ++++++
 .../write/InternalCreateMultiTimeSeriesNode.java   | 164 ++++++
 .../db/mpp/plan/scheduler/StandaloneScheduler.java |   2 +-
 .../db/mpp/plan/statement/StatementVisitor.java    |  12 +
 .../InternalBatchActivateTemplateStatement.java    |  59 ++
 .../InternalCreateMultiTimeSeriesStatement.java    |  57 ++
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |   4 +-
 .../protocol/rest/impl/GrafanaApiServiceImpl.java  |   4 +-
 .../db/protocol/rest/impl/RestApiServiceImpl.java  |   4 +-
 .../metrics/IoTDBInternalLocalReporter.java        |   4 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   4 +-
 .../java/org/apache/iotdb/db/sync/SyncService.java |   2 +-
 .../iotdb/db/sync/pipedata/load/ILoader.java       |   4 +-
 .../db/sync/transport/server/ReceiverManager.java  |   2 +-
 .../db/wal/recover/file/TsFilePlanRedoer.java      |   2 +-
 .../db/mpp/plan/StandaloneCoordinatorTest.java     |   2 +-
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |   1 +
 .../iotdb/db/mpp/plan/plan/distribution/Util.java  |   2 +-
 37 files changed, 1774 insertions(+), 627 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 419af63350..ecab514495 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -29,11 +29,11 @@ import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 075e48db48..ca426250d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
 import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanFactory;
 import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
@@ -41,6 +42,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMulti
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
@@ -51,6 +54,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -162,6 +166,39 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
   }
 
+  @Override
+  public TSStatus visitInternalCreateMultiTimeSeries(
+      InternalCreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
+    PartialPath devicePath;
+    MeasurementGroup measurementGroup;
+
+    List<TSStatus> alreadyExistingTimeseries = new ArrayList<>();
+    List<TSStatus> failingStatus = new ArrayList<>();
+
+    for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
+        node.getDeviceMap().entrySet()) {
+      devicePath = deviceEntry.getKey();
+      measurementGroup = deviceEntry.getValue().right;
+      if (deviceEntry.getValue().left) {
+        executeInternalCreateAlignedTimeseries(
+            devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
+      } else {
+        executeInternalCreateTimeseries(
+            devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
+      }
+    }
+
+    if (!failingStatus.isEmpty()) {
+      return RpcUtils.getStatus(failingStatus);
+    }
+
+    if (!alreadyExistingTimeseries.isEmpty()) {
+      return RpcUtils.getStatus(alreadyExistingTimeseries);
+    }
+
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+  }
+
   private void executeInternalCreateTimeseries(
       PartialPath devicePath,
       MeasurementGroup measurementGroup,
@@ -289,6 +326,30 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
     }
   }
 
+  @Override
+  public TSStatus visitInternalBatchActivateTemplate(
+      InternalBatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
+    for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+        node.getTemplateActivationMap().entrySet()) {
+      Template template = ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
+      try {
+        schemaRegion.activateSchemaTemplate(
+            SchemaRegionPlanFactory.getActivateTemplateInClusterPlan(
+                entry.getKey(), entry.getValue().right, entry.getValue().left),
+            template);
+      } catch (TemplateIsInUseException e) {
+        logger.info(
+            String.format(
+                "Schema template has already been activated on path %s, there's no need to activate again.",
+                entry.getKey()));
+      } catch (MetadataException e) {
+        logger.error(e.getMessage(), e);
+        return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+      }
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
   @Override
   public TSStatus visitConstructSchemaBlackList(
       ConstructSchemaBlackListNode node, ISchemaRegion schemaRegion) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c1f46626e7..b237707483 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -39,13 +39,15 @@ import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
@@ -67,6 +69,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -501,6 +504,107 @@ public class RegionWriteExecutor {
       }
     }
 
+    @Override
+    public RegionExecutionResult visitInternalCreateMultiTimeSeries(
+        InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+      ISchemaRegion schemaRegion =
+          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) context.getRegionId());
+      if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+        context.getRegionWriteValidationRWLock().writeLock().lock();
+        try {
+          List<TSStatus> failingStatus = new ArrayList<>();
+          List<TSStatus> alreadyExistingStatus = new ArrayList<>();
+
+          MeasurementGroup measurementGroup;
+          Map<Integer, MetadataException> failingMeasurementMap;
+          MetadataException metadataException;
+          for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
+              node.getDeviceMap().entrySet()) {
+            measurementGroup = deviceEntry.getValue().right;
+            failingMeasurementMap =
+                schemaRegion.checkMeasurementExistence(
+                    deviceEntry.getKey(),
+                    measurementGroup.getMeasurements(),
+                    measurementGroup.getAliasList());
+            // filter failed measurement and keep the rest for execution
+            for (Map.Entry<Integer, MetadataException> failingMeasurement :
+                failingMeasurementMap.entrySet()) {
+              metadataException = failingMeasurement.getValue();
+              if (metadataException.getErrorCode()
+                  == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+                LOGGER.info(
+                    "There's no need to internal create timeseries. {}",
+                    failingMeasurement.getValue().getMessage());
+                alreadyExistingStatus.add(
+                    RpcUtils.getStatus(
+                        metadataException.getErrorCode(),
+                        MeasurementPath.transformDataToString(
+                            ((MeasurementAlreadyExistException) metadataException)
+                                .getMeasurementPath())));
+              } else {
+                LOGGER.error("Metadata error: ", metadataException);
+                failingStatus.add(
+                    RpcUtils.getStatus(
+                        metadataException.getErrorCode(), metadataException.getMessage()));
+              }
+            }
+            measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
+          }
+
+          RegionExecutionResult executionResult =
+              super.visitInternalCreateMultiTimeSeries(node, context);
+
+          if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+            return executionResult;
+          }
+
+          TSStatus executionStatus = executionResult.getStatus();
+
+          // separate the measurement_already_exist exception and other exceptions process,
+          // measurement_already_exist exception is acceptable due to concurrent timeseries creation
+          if (failingStatus.isEmpty()) {
+            if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+              if (executionStatus.getSubStatus().get(0).getCode()
+                  == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+                // there's only measurement_already_exist exception
+                alreadyExistingStatus.addAll(executionStatus.getSubStatus());
+              } else {
+                failingStatus.addAll(executionStatus.getSubStatus());
+              }
+            } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              failingStatus.add(executionStatus);
+            }
+          } else {
+            if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+              if (executionStatus.getSubStatus().get(0).getCode()
+                  != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+                failingStatus.addAll(executionStatus.getSubStatus());
+              }
+            } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              failingStatus.add(executionStatus);
+            }
+          }
+
+          TSStatus status;
+          if (failingStatus.isEmpty()) {
+            status = RpcUtils.getStatus(alreadyExistingStatus);
+          } else {
+            status = RpcUtils.getStatus(failingStatus);
+          }
+
+          RegionExecutionResult result = new RegionExecutionResult();
+          result.setAccepted(false);
+          result.setMessage(status.getMessage());
+          result.setStatus(status);
+          return result;
+        } finally {
+          context.getRegionWriteValidationRWLock().writeLock().unlock();
+        }
+      } else {
+        return super.visitInternalCreateMultiTimeSeries(node, context);
+      }
+    }
+
     @Override
     public RegionExecutionResult visitActivateTemplate(
         ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
@@ -527,6 +631,39 @@ public class RegionWriteExecutor {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
     }
+
+    @Override
+    public RegionExecutionResult visitInternalBatchActivateTemplate(
+        InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext context) {
+      // activate template operation shall be blocked by unset template check
+      context.getRegionWriteValidationRWLock().readLock().lock();
+      try {
+        for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+            node.getTemplateActivationMap().entrySet()) {
+          Pair<Template, PartialPath> templateSetInfo =
+              ClusterTemplateManager.getInstance().checkTemplateSetInfo(entry.getKey());
+          if (templateSetInfo == null) {
+            // The activation has already been validated during analyzing.
+            // That means the template is being unset during the activation plan transport.
+            RegionExecutionResult result = new RegionExecutionResult();
+            result.setAccepted(false);
+            String message =
+                String.format(
+                    "Template is being unsetting from prefix path of %s. Please try activating later.",
+                    new PartialPath(
+                            Arrays.copyOf(entry.getKey().getNodes(), entry.getValue().right + 1))
+                        .getFullPath());
+            result.setMessage(message);
+            result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message));
+            return result;
+          }
+        }
+
+        return super.visitInternalBatchActivateTemplate(node, context);
+      } finally {
+        context.getRegionWriteValidationRWLock().readLock().unlock();
+      }
+    }
   }
 
   private static class WritePlanNodeExecutionContext {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 8d18d611c1..c9706cf049 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.QueryIdGenerator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 637dfe73e2..42c7aedcc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -47,6 +47,8 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
@@ -77,6 +79,8 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -1603,6 +1607,26 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
+  @Override
+  public Analysis visitInternalCreateMultiTimeSeries(
+      InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement,
+      MPPQueryContext context) {
+    context.setQueryType(QueryType.WRITE);
+
+    Analysis analysis = new Analysis();
+    analysis.setStatement(internalCreateMultiTimeSeriesStatement);
+
+    PathPatternTree pathPatternTree = new PathPatternTree();
+    for (PartialPath devicePath : internalCreateMultiTimeSeriesStatement.getDeviceMap().keySet()) {
+      pathPatternTree.appendFullPath(devicePath.concatNode(ONE_LEVEL_PATH_WILDCARD));
+    }
+
+    SchemaPartition schemaPartitionInfo;
+    schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(pathPatternTree);
+    analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+    return analysis;
+  }
+
   @Override
   public Analysis visitCreateMultiTimeseries(
       CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
@@ -2490,6 +2514,26 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
+  @Override
+  public Analysis visitInternalBatchActivateTemplate(
+      InternalBatchActivateTemplateStatement internalBatchActivateTemplateStatement,
+      MPPQueryContext context) {
+    context.setQueryType(QueryType.WRITE);
+    Analysis analysis = new Analysis();
+    analysis.setStatement(internalBatchActivateTemplateStatement);
+
+    PathPatternTree patternTree = new PathPatternTree();
+    for (PartialPath activatePath :
+        internalBatchActivateTemplateStatement.getDeviceMap().keySet()) {
+      patternTree.appendPathPattern(activatePath.concatNode(ONE_LEVEL_PATH_WILDCARD));
+    }
+    SchemaPartition partition = partitionFetcher.getOrCreateSchemaPartition(patternTree);
+
+    analysis.setSchemaPartitionInfo(partition);
+
+    return analysis;
+  }
+
   @Override
   public Analysis visitShowPathsUsingTemplate(
       ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 010b491e55..40d032b3dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 
 import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
deleted file mode 100644
index 16a49ae47a..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan.analyze;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
-import org.apache.iotdb.db.metadata.template.ITemplateManager;
-import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
-import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
-public class ClusterSchemaFetcher implements ISchemaFetcher {
-
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
-  private final Coordinator coordinator = Coordinator.getInstance();
-  private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
-  private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
-
-  private static final class ClusterSchemaFetcherHolder {
-    private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
-
-    private ClusterSchemaFetcherHolder() {}
-  }
-
-  public static ClusterSchemaFetcher getInstance() {
-    return ClusterSchemaFetcherHolder.INSTANCE;
-  }
-
-  private ClusterSchemaFetcher() {}
-
-  @Override
-  public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, false);
-  }
-
-  @Override
-  public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, true);
-  }
-
-  private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) {
-    Map<Integer, Template> templateMap = new HashMap<>();
-    patternTree.constructTree();
-    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
-    for (PartialPath pattern : pathPatternList) {
-      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
-    }
-
-    if (withTags) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
-    }
-
-    List<PartialPath> fullPathList = new ArrayList<>();
-    for (PartialPath pattern : pathPatternList) {
-      if (!pattern.hasWildcard()) {
-        fullPathList.add(pattern);
-      }
-    }
-
-    if (fullPathList.isEmpty()) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
-    }
-
-    // The schema cache R/W and fetch operation must be locked together thus the cache clean
-    // operation executed by delete timeseries will be effective.
-    schemaCache.takeReadLock();
-    try {
-      ClusterSchemaTree schemaTree;
-      if (fullPathList.size() == pathPatternList.size()) {
-        boolean isAllCached = true;
-        schemaTree = new ClusterSchemaTree();
-        ClusterSchemaTree cachedSchema;
-        Set<String> storageGroupSet = new HashSet<>();
-        for (PartialPath fullPath : fullPathList) {
-          cachedSchema = schemaCache.get(fullPath);
-          if (cachedSchema.isEmpty()) {
-            isAllCached = false;
-            break;
-          } else {
-            schemaTree.mergeSchemaTree(cachedSchema);
-            storageGroupSet.addAll(cachedSchema.getDatabases());
-          }
-        }
-        if (isAllCached) {
-          schemaTree.setDatabases(storageGroupSet);
-          return schemaTree;
-        }
-      }
-
-      schemaTree =
-          executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
-
-      // only cache the schema fetched by full path
-      List<MeasurementPath> measurementPathList;
-      for (PartialPath fullPath : fullPathList) {
-        measurementPathList = schemaTree.searchMeasurementPaths(fullPath).left;
-        if (measurementPathList.isEmpty()) {
-          continue;
-        }
-        schemaCache.put(
-            schemaTree.getBelongedDatabase(measurementPathList.get(0)), measurementPathList.get(0));
-      }
-      return schemaTree;
-    } finally {
-      schemaCache.releaseReadLock();
-    }
-  }
-
-  private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    try {
-      ExecutionResult executionResult =
-          coordinator.execute(
-              schemaFetchStatement,
-              queryId,
-              null,
-              "",
-              ClusterPartitionFetcher.getInstance(),
-              this,
-              config.getQueryTimeoutThreshold());
-      if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new RuntimeException(
-            String.format(
-                "cannot fetch schema, status is: %s, msg is: %s",
-                executionResult.status.getCode(), executionResult.status.getMessage()));
-      }
-      try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
-        ClusterSchemaTree result = new ClusterSchemaTree();
-        Set<String> databaseSet = new HashSet<>();
-        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
-          // The query will be transited to FINISHED when invoking getBatchResult() at the last time
-          // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock;
-          try {
-            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-          } catch (IoTDBException e) {
-            throw new RuntimeException("Fetch Schema failed. ", e);
-          }
-          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
-            break;
-          }
-          Column column = tsBlock.get().getColumn(0);
-          for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, databaseSet);
-          }
-        }
-        result.setDatabases(databaseSet);
-        return result;
-      }
-    } finally {
-      coordinator.cleanupQueryExecution(queryId);
-    }
-  }
-
-  private void parseFetchedData(
-      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
-    InputStream inputStream = new ByteArrayInputStream(data.getValues());
-    try {
-      byte type = ReadWriteIOUtils.readByte(inputStream);
-      if (type == 0) {
-        int size = ReadWriteIOUtils.readInt(inputStream);
-        for (int i = 0; i < size; i++) {
-          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
-        }
-      } else if (type == 1) {
-        resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
-      } else {
-        throw new RuntimeException(
-            new MetadataException("Failed to fetch schema because of unrecognized data"));
-      }
-    } catch (IOException e) {
-      // Totally memory operation. This case won't happen.
-    }
-  }
-
-  @Override
-  public ISchemaTree fetchSchemaWithAutoCreate(
-      PartialPath devicePath,
-      String[] measurements,
-      Function<Integer, TSDataType> getDataType,
-      boolean isAligned) {
-    // The schema cache R/W and fetch operation must be locked together thus the cache clean
-    // operation executed by delete timeseries will be effective.
-    schemaCache.takeReadLock();
-    try {
-      ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
-      List<Integer> indexOfMissingMeasurements =
-          checkMissingMeasurements(schemaTree, devicePath, measurements);
-
-      // all schema can be taken from cache
-      if (indexOfMissingMeasurements.isEmpty()) {
-        return schemaTree;
-      }
-
-      // try fetch the missing schema from remote and cache fetched schema
-      PathPatternTree patternTree = new PathPatternTree();
-      for (int index : indexOfMissingMeasurements) {
-        patternTree.appendFullPath(devicePath, measurements[index]);
-      }
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
-      if (!remoteSchemaTree.isEmpty()) {
-        schemaTree.mergeSchemaTree(remoteSchemaTree);
-        schemaCache.put(remoteSchemaTree);
-      }
-
-      if (!config.isAutoCreateSchemaEnabled()) {
-        return schemaTree;
-      }
-
-      // auto create the still missing schema and merge them into schemaTree
-      checkAndAutoCreateMissingMeasurements(
-          schemaTree,
-          devicePath,
-          indexOfMissingMeasurements,
-          measurements,
-          getDataType,
-          null,
-          null,
-          isAligned);
-
-      return schemaTree;
-    } finally {
-      schemaCache.releaseReadLock();
-    }
-  }
-
-  @Override
-  public ISchemaTree fetchSchemaListWithAutoCreate(
-      List<PartialPath> devicePathList,
-      List<String[]> measurementsList,
-      List<TSDataType[]> tsDataTypesList,
-      List<Boolean> isAlignedList) {
-    return fetchSchemaListWithAutoCreate(
-        devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList);
-  }
-
-  @Override
-  public ISchemaTree fetchSchemaListWithAutoCreate(
-      List<PartialPath> devicePathList,
-      List<String[]> measurementsList,
-      List<TSDataType[]> tsDataTypesList,
-      List<TSEncoding[]> encodingsList,
-      List<CompressionType[]> compressionTypesList,
-      List<Boolean> isAlignedList) {
-    // The schema cache R/W and fetch operation must be locked together thus the cache clean
-    // operation executed by delete timeseries will be effective.
-    schemaCache.takeReadLock();
-    try {
-      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-      PathPatternTree patternTree = new PathPatternTree();
-      List<List<Integer>> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size());
-      for (int i = 0; i < devicePathList.size(); i++) {
-        schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
-        List<Integer> indexOfMissingMeasurements =
-            checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i));
-        indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
-        for (int index : indexOfMissingMeasurements) {
-          patternTree.appendFullPath(devicePathList.get(i), measurementsList.get(i)[index]);
-        }
-      }
-
-      // all schema can be taken from cache
-      if (patternTree.isEmpty()) {
-        return schemaTree;
-      }
-
-      // try fetch the missing schema from remote and cache fetched schema
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
-      if (!remoteSchemaTree.isEmpty()) {
-        schemaTree.mergeSchemaTree(remoteSchemaTree);
-        schemaCache.put(remoteSchemaTree);
-      }
-
-      if (!config.isAutoCreateSchemaEnabled()) {
-        return schemaTree;
-      }
-
-      // auto create the still missing schema and merge them into schemaTree
-      for (int i = 0; i < devicePathList.size(); i++) {
-        int finalI = i;
-        checkAndAutoCreateMissingMeasurements(
-            schemaTree,
-            devicePathList.get(i),
-            indexOfMissingMeasurementsList.get(i),
-            measurementsList.get(i),
-            index -> tsDataTypesList.get(finalI)[index],
-            encodingsList == null ? null : encodingsList.get(i),
-            compressionTypesList == null ? null : compressionTypesList.get(i),
-            isAlignedList.get(i));
-      }
-      return schemaTree;
-    } finally {
-      schemaCache.releaseReadLock();
-    }
-  }
-
-  @Override
-  public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
-    return templateManager.checkTemplateSetInfo(path);
-  }
-
-  @Override
-  public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) {
-    return templateManager.checkAllRelatedTemplate(pathPattern);
-  }
-
-  @Override
-  public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
-    return templateManager.getAllPathsSetTemplate(templateName);
-  }
-
-  // check which measurements are missing and auto create the missing measurements and merge them
-  // into given schemaTree
-  private void checkAndAutoCreateMissingMeasurements(
-      ClusterSchemaTree schemaTree,
-      PartialPath devicePath,
-      List<Integer> indexOfMissingMeasurements,
-      String[] measurements,
-      Function<Integer, TSDataType> getDataType,
-      TSEncoding[] encodings,
-      CompressionType[] compressionTypes,
-      boolean isAligned) {
-    // check missing measurements
-    DeviceSchemaInfo deviceSchemaInfo =
-        schemaTree.searchDeviceSchemaInfo(
-            devicePath,
-            indexOfMissingMeasurements.stream()
-                .map(index -> measurements[index])
-                .collect(Collectors.toList()));
-    if (deviceSchemaInfo != null) {
-      List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
-      int removedCount = 0;
-      for (int i = 0, size = schemaList.size(); i < size; i++) {
-        if (schemaList.get(i) != null) {
-          indexOfMissingMeasurements.remove(i - removedCount);
-          removedCount++;
-        }
-      }
-    }
-    if (indexOfMissingMeasurements.isEmpty()) {
-      return;
-    }
-
-    // check whether there is template should be activated
-    Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
-    if (templateInfo != null) {
-      Template template = templateInfo.left;
-      boolean shouldActivateTemplate = false;
-      for (int index : indexOfMissingMeasurements) {
-        if (template.hasSchema(measurements[index])) {
-          shouldActivateTemplate = true;
-          break;
-        }
-      }
-
-      if (shouldActivateTemplate) {
-        internalActivateTemplate(devicePath);
-        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
-        for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
-          if (!template.hasSchema(measurements[i])) {
-            recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
-          }
-        }
-        indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
-        for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
-          schemaTree.appendSingleMeasurement(
-              devicePath.concatNode(entry.getKey()),
-              (MeasurementSchema) entry.getValue(),
-              null,
-              null,
-              template.isDirectAligned());
-        }
-
-        if (indexOfMissingMeasurements.isEmpty()) {
-          return;
-        }
-      }
-    }
-
-    // auto create the rest missing timeseries
-    List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSDataType> dataTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSEncoding> encodingsOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<CompressionType> compressionTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    indexOfMissingMeasurements.forEach(
-        index -> {
-          TSDataType tsDataType = getDataType.apply(index);
-          // tsDataType == null means insert null value to a non-exist series
-          // should skip creating them
-          if (tsDataType != null) {
-            missingMeasurements.add(measurements[index]);
-            dataTypesOfMissingMeasurement.add(tsDataType);
-            encodingsOfMissingMeasurement.add(
-                encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]);
-            compressionTypesOfMissingMeasurement.add(
-                compressionTypes == null
-                    ? TSFileDescriptor.getInstance().getConfig().getCompressor()
-                    : compressionTypes[index]);
-          }
-        });
-
-    if (!missingMeasurements.isEmpty()) {
-      schemaTree.mergeSchemaTree(
-          internalCreateTimeseries(
-              devicePath,
-              missingMeasurements,
-              dataTypesOfMissingMeasurement,
-              encodingsOfMissingMeasurement,
-              compressionTypesOfMissingMeasurement,
-              isAligned));
-    }
-  }
-
-  private List<Integer> checkMissingMeasurements(
-      ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
-    DeviceSchemaInfo deviceSchemaInfo =
-        schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
-    if (deviceSchemaInfo == null) {
-      return IntStream.range(0, measurements.length).boxed().collect(Collectors.toList());
-    }
-
-    List<Integer> indexOfMissingMeasurements = new ArrayList<>();
-    List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
-    for (int i = 0; i < measurements.length; i++) {
-      if (schemaList.get(i) == null) {
-        indexOfMissingMeasurements.add(i);
-      }
-    }
-
-    return indexOfMissingMeasurements;
-  }
-
-  // try to create the target timeseries and return schemaTree involving successfully created
-  // timeseries and existing timeseries
-  private ClusterSchemaTree internalCreateTimeseries(
-      PartialPath devicePath,
-      List<String> measurements,
-      List<TSDataType> tsDataTypes,
-      List<TSEncoding> encodings,
-      List<CompressionType> compressors,
-      boolean isAligned) {
-    List<MeasurementPath> measurementPathList =
-        executeInternalCreateTimeseriesStatement(
-            new InternalCreateTimeSeriesStatement(
-                devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
-
-    Set<Integer> alreadyExistingMeasurementIndexSet =
-        measurementPathList.stream()
-            .map(o -> measurements.indexOf(o.getMeasurement()))
-            .collect(Collectors.toSet());
-
-    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-    schemaTree.appendMeasurementPaths(measurementPathList);
-
-    for (int i = 0, size = measurements.size(); i < size; i++) {
-      if (alreadyExistingMeasurementIndexSet.contains(i)) {
-        continue;
-      }
-
-      schemaTree.appendSingleMeasurement(
-          devicePath.concatNode(measurements.get(i)),
-          new MeasurementSchema(
-              measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
-          null,
-          null,
-          isAligned);
-    }
-
-    return schemaTree;
-  }
-
-  // auto create timeseries and return the existing timeseries info
-  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
-      InternalCreateTimeSeriesStatement statement) {
-
-    ExecutionResult executionResult = executeStatement(statement);
-
-    int statusCode = executionResult.status.getCode();
-    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return Collections.emptyList();
-    }
-
-    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      throw new RuntimeException(
-          new IoTDBException(executionResult.status.getMessage(), statusCode));
-    }
-
-    Set<String> failedCreationSet = new HashSet<>();
-    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
-    for (TSStatus subStatus : executionResult.status.subStatus) {
-      if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-        alreadyExistingMeasurements.add(
-            MeasurementPath.parseDataFromString(subStatus.getMessage()));
-      } else {
-        failedCreationSet.add(subStatus.message);
-      }
-    }
-
-    if (!failedCreationSet.isEmpty()) {
-      throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
-    }
-
-    return alreadyExistingMeasurements;
-  }
-
-  public void internalActivateTemplate(PartialPath devicePath) {
-    ExecutionResult executionResult = executeStatement(new ActivateTemplateStatement(devicePath));
-    TSStatus status = executionResult.status;
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
-      throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
-    }
-  }
-
-  private ExecutionResult executeStatement(Statement statement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    return coordinator.execute(
-        statement,
-        queryId,
-        null,
-        "",
-        ClusterPartitionFetcher.getInstance(),
-        this,
-        config.getQueryTimeoutThreshold());
-  }
-
-  @Override
-  public void invalidAllCache() {
-    DataNodeSchemaCache.getInstance().cleanUp();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index f2ab109b79..d8b3cab4d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceGroupSchemaTree;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
new file mode 100644
index 0000000000..4b4ef0f795
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+class AutoCreateSchemaExecutor {
+
+  private final ITemplateManager templateManager;
+  private final Function<Statement, ExecutionResult> statementExecutor;
+
+  AutoCreateSchemaExecutor(
+      ITemplateManager templateManager, Function<Statement, ExecutionResult> statementExecutor) {
+    this.templateManager = templateManager;
+    this.statementExecutor = statementExecutor;
+  }
+
+  // auto create the missing measurements and merge them into given schemaTree
+  void autoCreateMissingMeasurements(
+      ClusterSchemaTree schemaTree,
+      PartialPath devicePath,
+      List<Integer> indexOfTargetMeasurements,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean isAligned) {
+    // check whether there is template should be activated
+    Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
+    if (templateInfo != null) {
+      Template template = templateInfo.left;
+      List<Integer> indexOfMeasurementsNotInTemplate =
+          checkMeasurementsInSchemaTemplate(
+              devicePath, indexOfTargetMeasurements, measurements, isAligned, template);
+      if (indexOfMeasurementsNotInTemplate.size() < indexOfTargetMeasurements.size()) {
+        // there are measurements in schema template
+        internalActivateTemplate(devicePath);
+        for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
+          schemaTree.appendSingleMeasurement(
+              devicePath.concatNode(entry.getKey()),
+              (MeasurementSchema) entry.getValue(),
+              null,
+              null,
+              template.isDirectAligned());
+        }
+      }
+      if (indexOfMeasurementsNotInTemplate.isEmpty()) {
+        return;
+      }
+      // there are measurements need to be created as normal timeseries
+      indexOfTargetMeasurements = indexOfMeasurementsNotInTemplate;
+    }
+
+    // auto create the rest missing timeseries
+    List<String> missingMeasurements = new ArrayList<>(indexOfTargetMeasurements.size());
+    List<TSDataType> dataTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfTargetMeasurements.size());
+    List<TSEncoding> encodingsOfMissingMeasurement =
+        new ArrayList<>(indexOfTargetMeasurements.size());
+    List<CompressionType> compressionTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfTargetMeasurements.size());
+    indexOfTargetMeasurements.forEach(
+        index -> {
+          TSDataType tsDataType = getDataType.apply(index);
+          // tsDataType == null means insert null value to a non-exist series
+          // should skip creating them
+          if (tsDataType != null) {
+            missingMeasurements.add(measurements[index]);
+            dataTypesOfMissingMeasurement.add(tsDataType);
+            encodingsOfMissingMeasurement.add(getDefaultEncoding(tsDataType));
+            compressionTypesOfMissingMeasurement.add(
+                TSFileDescriptor.getInstance().getConfig().getCompressor());
+          }
+        });
+
+    if (!missingMeasurements.isEmpty()) {
+      internalCreateTimeSeries(
+          schemaTree,
+          devicePath,
+          missingMeasurements,
+          dataTypesOfMissingMeasurement,
+          encodingsOfMissingMeasurement,
+          compressionTypesOfMissingMeasurement,
+          isAligned);
+    }
+  }
+
+  void autoCreateMissingMeasurements(
+      ClusterSchemaTree schemaTree,
+      List<PartialPath> devicePathList,
+      List<Integer> indexOfTargetDevices,
+      List<List<Integer>> indexOfTargetMeasurementsList,
+      List<String[]> measurementsList,
+      List<TSDataType[]> tsDataTypesList,
+      List<TSEncoding[]> encodingsList,
+      List<CompressionType[]> compressionTypesList,
+      List<Boolean> isAlignedList) {
+    // check whether there is template should be activated
+
+    Map<PartialPath, Pair<Template, PartialPath>> devicesNeedActivateTemplate = new HashMap<>();
+    Map<PartialPath, Pair<Boolean, MeasurementGroup>> devicesNeedAutoCreateTimeSeries =
+        new HashMap<>();
+    int deviceIndex;
+    PartialPath devicePath;
+    List<Integer> indexOfTargetMeasurements;
+    Pair<Template, PartialPath> templateInfo;
+    Template template;
+    List<Integer> indexOfMeasurementsNotInTemplate;
+    for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
+      deviceIndex = indexOfTargetDevices.get(i);
+      devicePath = devicePathList.get(deviceIndex);
+      indexOfTargetMeasurements = indexOfTargetMeasurementsList.get(i);
+
+      templateInfo = devicesNeedActivateTemplate.get(devicePath);
+      if (templateInfo == null) {
+        templateInfo = templateManager.checkTemplateSetInfo(devicePath);
+      }
+
+      if (templateInfo == null) {
+        indexOfMeasurementsNotInTemplate = indexOfTargetMeasurements;
+      } else {
+        template = templateInfo.left;
+        indexOfMeasurementsNotInTemplate =
+            checkMeasurementsInSchemaTemplate(
+                devicePath,
+                indexOfTargetMeasurements,
+                measurementsList.get(deviceIndex),
+                isAlignedList.get(deviceIndex),
+                template);
+        if (indexOfMeasurementsNotInTemplate.size() < indexOfTargetMeasurements.size()) {
+          // there are measurements in schema template
+          devicesNeedActivateTemplate.putIfAbsent(devicePath, templateInfo);
+        }
+      }
+
+      if (!indexOfMeasurementsNotInTemplate.isEmpty()) {
+        // there are measurements need to be created as normal timeseries
+        int finalDeviceIndex = deviceIndex;
+        List<Integer> finalIndexOfMeasurementsNotInTemplate = indexOfMeasurementsNotInTemplate;
+        devicesNeedAutoCreateTimeSeries.compute(
+            devicePath,
+            (k, v) -> {
+              if (v == null) {
+                v = new Pair<>(isAlignedList.get(finalDeviceIndex), new MeasurementGroup());
+              }
+              MeasurementGroup measurementGroup = v.right;
+              String[] measurements = measurementsList.get(finalDeviceIndex);
+              TSDataType[] tsDataTypes = tsDataTypesList.get(finalDeviceIndex);
+              TSEncoding[] encodings =
+                  encodingsList == null ? null : encodingsList.get(finalDeviceIndex);
+              CompressionType[] compressionTypes =
+                  compressionTypesList == null ? null : compressionTypesList.get(finalDeviceIndex);
+              for (int measurementIndex : finalIndexOfMeasurementsNotInTemplate) {
+                if (tsDataTypes[measurementIndex] == null) {
+                  continue;
+                }
+                measurementGroup.addMeasurement(
+                    measurements[measurementIndex],
+                    tsDataTypes[measurementIndex],
+                    encodings == null
+                        ? getDefaultEncoding(tsDataTypes[measurementIndex])
+                        : encodings[measurementIndex],
+                    compressionTypes == null
+                        ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+                        : compressionTypes[measurementIndex]);
+              }
+              return v;
+            });
+      }
+    }
+
+    if (!devicesNeedActivateTemplate.isEmpty()) {
+      internalActivateTemplate(devicesNeedActivateTemplate);
+      for (Map.Entry<PartialPath, Pair<Template, PartialPath>> entry :
+          devicesNeedActivateTemplate.entrySet()) {
+        devicePath = entry.getKey();
+        template = entry.getValue().left;
+        for (Map.Entry<String, IMeasurementSchema> measurementEntry :
+            template.getSchemaMap().entrySet()) {
+          schemaTree.appendSingleMeasurement(
+              devicePath.concatNode(measurementEntry.getKey()),
+              (MeasurementSchema) measurementEntry.getValue(),
+              null,
+              null,
+              template.isDirectAligned());
+        }
+      }
+    }
+
+    if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
+      internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries);
+    }
+  }
+
+  private List<Integer> checkMeasurementsInSchemaTemplate(
+      PartialPath devicePath,
+      List<Integer> indexOfTargetMeasurements,
+      String[] measurements,
+      boolean isAligned,
+      Template template) {
+    // check whether there is template should be activated
+    boolean shouldActivateTemplate = false;
+    for (int index : indexOfTargetMeasurements) {
+      if (template.hasSchema(measurements[index])) {
+        shouldActivateTemplate = true;
+        break;
+      }
+    }
+    if (shouldActivateTemplate) {
+      List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+      for (int index : indexOfTargetMeasurements) {
+        if (!template.hasSchema(measurements[index])) {
+          recheckedIndexOfMissingMeasurements.add(index);
+        }
+      }
+      return recheckedIndexOfMissingMeasurements;
+    } else {
+      return indexOfTargetMeasurements;
+    }
+  }
+
+  // try to create the target timeseries and merge schema of successfully created
+  // timeseries and existing timeseries into given schemaTree
+  private void internalCreateTimeSeries(
+      ClusterSchemaTree schemaTree,
+      PartialPath devicePath,
+      List<String> measurements,
+      List<TSDataType> tsDataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      boolean isAligned) {
+    List<MeasurementPath> measurementPathList =
+        executeInternalCreateTimeseriesStatement(
+            new InternalCreateTimeSeriesStatement(
+                devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
+
+    Set<Integer> alreadyExistingMeasurementIndexSet =
+        measurementPathList.stream()
+            .map(o -> measurements.indexOf(o.getMeasurement()))
+            .collect(Collectors.toSet());
+
+    schemaTree.appendMeasurementPaths(measurementPathList);
+
+    for (int i = 0, size = measurements.size(); i < size; i++) {
+      if (alreadyExistingMeasurementIndexSet.contains(i)) {
+        continue;
+      }
+
+      schemaTree.appendSingleMeasurement(
+          devicePath.concatNode(measurements.get(i)),
+          new MeasurementSchema(
+              measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
+          null,
+          null,
+          isAligned);
+    }
+  }
+
+  // auto create timeseries and return the existing timeseries info
+  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(Statement statement) {
+
+    ExecutionResult executionResult = statementExecutor.apply(statement);
+
+    int statusCode = executionResult.status.getCode();
+    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return Collections.emptyList();
+    }
+
+    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      throw new RuntimeException(
+          new IoTDBException(executionResult.status.getMessage(), statusCode));
+    }
+
+    Set<String> failedCreationSet = new HashSet<>();
+    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
+    for (TSStatus subStatus : executionResult.status.subStatus) {
+      if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+        alreadyExistingMeasurements.add(
+            MeasurementPath.parseDataFromString(subStatus.getMessage()));
+      } else {
+        failedCreationSet.add(subStatus.message);
+      }
+    }
+
+    if (!failedCreationSet.isEmpty()) {
+      throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
+    }
+
+    return alreadyExistingMeasurements;
+  }
+
+  private void internalActivateTemplate(PartialPath devicePath) {
+    ExecutionResult executionResult =
+        statementExecutor.apply(new ActivateTemplateStatement(devicePath));
+    TSStatus status = executionResult.status;
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      throw new SemanticException(new IoTDBException(status.getMessage(), status.getCode()));
+    }
+  }
+
+  private void internalActivateTemplate(
+      Map<PartialPath, Pair<Template, PartialPath>> devicesNeedActivateTemplate) {
+    ExecutionResult executionResult =
+        statementExecutor.apply(
+            new InternalBatchActivateTemplateStatement(devicesNeedActivateTemplate));
+    TSStatus status = executionResult.status;
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      return;
+    }
+    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      Set<String> failedActivationSet = new HashSet<>();
+      for (TSStatus subStatus : status.subStatus) {
+        if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            && subStatus.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+          failedActivationSet.add(subStatus.message);
+        }
+      }
+      if (!failedActivationSet.isEmpty()) {
+        throw new SemanticException(new MetadataException(String.join("; ", failedActivationSet)));
+      }
+    } else {
+      throw new SemanticException(new IoTDBException(status.getMessage(), status.getCode()));
+    }
+  }
+
+  private void internalCreateTimeSeries(
+      ClusterSchemaTree schemaTree,
+      Map<PartialPath, Pair<Boolean, MeasurementGroup>> devicesNeedAutoCreateTimeSeries) {
+
+    List<MeasurementPath> measurementPathList =
+        executeInternalCreateTimeseriesStatement(
+            new InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries));
+
+    schemaTree.appendMeasurementPaths(measurementPathList);
+
+    Map<PartialPath, Set<String>> alreadyExistingMeasurementMap = new HashMap<>();
+    for (MeasurementPath measurementPath : measurementPathList) {
+      alreadyExistingMeasurementMap
+          .computeIfAbsent(measurementPath.getDevicePath(), k -> new HashSet<>())
+          .add(measurementPath.getMeasurement());
+    }
+    Set<String> measurementSet;
+    MeasurementGroup measurementGroup;
+    for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry :
+        devicesNeedAutoCreateTimeSeries.entrySet()) {
+      measurementSet = alreadyExistingMeasurementMap.get(entry.getKey());
+      measurementGroup = entry.getValue().right;
+      for (int i = 0, size = measurementGroup.size(); i < size; i++) {
+        if (measurementSet != null
+            && measurementSet.contains(measurementGroup.getMeasurements().get(i))) {
+          continue;
+        }
+        schemaTree.appendSingleMeasurement(
+            entry.getKey().concatNode(measurementGroup.getMeasurements().get(i)),
+            new MeasurementSchema(
+                measurementGroup.getMeasurements().get(i),
+                measurementGroup.getDataTypes().get(i),
+                measurementGroup.getEncodings().get(i),
+                measurementGroup.getCompressors().get(i)),
+            null,
+            null,
+            entry.getValue().left);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
new file mode 100644
index 0000000000..9e8576a18c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+class ClusterSchemaFetchExecutor {
+
+  private final Coordinator coordinator;
+  private final ITemplateManager templateManager;
+  private final Supplier<Long> queryIdProvider;
+  private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+  private final Consumer<ClusterSchemaTree> schemaCacheUpdater;
+
+  ClusterSchemaFetchExecutor(
+      Coordinator coordinator,
+      ITemplateManager templateManager,
+      Supplier<Long> queryIdProvider,
+      BiFunction<Long, Statement, ExecutionResult> statementExecutor,
+      Consumer<ClusterSchemaTree> schemaCacheUpdater) {
+    this.coordinator = coordinator;
+    this.templateManager = templateManager;
+    this.queryIdProvider = queryIdProvider;
+    this.statementExecutor = statementExecutor;
+    this.schemaCacheUpdater = schemaCacheUpdater;
+  }
+
+  /**
+   * This method is used for scenarios that patternTree may have wildcard or there's no need to
+   * cache the result.
+   */
+  ClusterSchemaTree fetchSchemaOfFuzzyMatch(PathPatternTree patternTree, boolean withTags) {
+    Map<Integer, Template> templateMap = new HashMap<>();
+    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+    for (PartialPath pattern : pathPatternList) {
+      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+    }
+    return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+  }
+
+  /**
+   * This method is used for scenarios that patternTree has no wildcard and the result should be
+   * cached.
+   *
+   * @param fullPathList all the fullPath without wildcard split from rawPatternTree
+   * @param rawPatternTree the pattern tree consisting of the fullPathList
+   * @return fetched schema
+   */
+  ClusterSchemaTree fetchSchemaOfPreciseMatch(
+      List<PartialPath> fullPathList, PathPatternTree rawPatternTree) {
+    ClusterSchemaTree schemaTree =
+        executeSchemaFetchQuery(
+            new SchemaFetchStatement(rawPatternTree, analyzeTemplate(fullPathList), false));
+    if (!schemaTree.isEmpty()) {
+      schemaCacheUpdater.accept(schemaTree);
+    }
+    return schemaTree;
+  }
+
+  ClusterSchemaTree fetchSchemaOfOneDevice(
+      PartialPath devicePath, String[] measurements, List<Integer> indexOfTargetMeasurements) {
+    PathPatternTree patternTree = new PathPatternTree();
+    for (int index : indexOfTargetMeasurements) {
+      patternTree.appendFullPath(devicePath, measurements[index]);
+    }
+    patternTree.constructTree();
+    return fetchSchemaAndCacheResult(patternTree);
+  }
+
+  ClusterSchemaTree fetchSchemaOfMultiDevices(
+      List<PartialPath> devicePathList,
+      List<String[]> measurementsList,
+      List<Integer> indexOfTargetDevices,
+      List<List<Integer>> indexOfTargetMeasurementsList) {
+    PathPatternTree patternTree = new PathPatternTree();
+    int deviceIndex;
+    for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
+      deviceIndex = indexOfTargetDevices.get(i);
+      for (int measurementIndex : indexOfTargetMeasurementsList.get(i)) {
+        patternTree.appendFullPath(
+            devicePathList.get(deviceIndex), measurementsList.get(deviceIndex)[measurementIndex]);
+      }
+    }
+    patternTree.constructTree();
+    return fetchSchemaAndCacheResult(patternTree);
+  }
+
+  private ClusterSchemaTree fetchSchemaAndCacheResult(PathPatternTree patternTree) {
+    ClusterSchemaTree schemaTree =
+        executeSchemaFetchQuery(
+            new SchemaFetchStatement(
+                patternTree, analyzeTemplate(patternTree.getAllPathPatterns()), false));
+    if (!schemaTree.isEmpty()) {
+      schemaCacheUpdater.accept(schemaTree);
+    }
+    return schemaTree;
+  }
+
+  private Map<Integer, Template> analyzeTemplate(List<PartialPath> pathPatternList) {
+    Map<Integer, Template> templateMap = new HashMap<>();
+    for (PartialPath pattern : pathPatternList) {
+      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+    }
+    return templateMap;
+  }
+
+  private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
+    long queryId = queryIdProvider.get();
+    try {
+      ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement);
+      if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException(
+            String.format(
+                "cannot fetch schema, status is: %s, msg is: %s",
+                executionResult.status.getCode(), executionResult.status.getMessage()));
+      }
+      try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
+        ClusterSchemaTree result = new ClusterSchemaTree();
+        Set<String> databaseSet = new HashSet<>();
+        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+          // The query will be transited to FINISHED when invoking getBatchResult() at the last time
+          // So we don't need to clean up it manually
+          Optional<TsBlock> tsBlock;
+          try {
+            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+          } catch (IoTDBException e) {
+            throw new RuntimeException("Fetch Schema failed. ", e);
+          }
+          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+            break;
+          }
+          Column column = tsBlock.get().getColumn(0);
+          for (int i = 0; i < column.getPositionCount(); i++) {
+            parseFetchedData(column.getBinary(i), result, databaseSet);
+          }
+        }
+        result.setDatabases(databaseSet);
+        return result;
+      }
+    } finally {
+      coordinator.cleanupQueryExecution(queryId);
+    }
+  }
+
+  private void parseFetchedData(
+      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
+    InputStream inputStream = new ByteArrayInputStream(data.getValues());
+    try {
+      byte type = ReadWriteIOUtils.readByte(inputStream);
+      if (type == 0) {
+        int size = ReadWriteIOUtils.readInt(inputStream);
+        for (int i = 0; i < size; i++) {
+          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
+        }
+      } else if (type == 1) {
+        resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+      } else {
+        throw new RuntimeException(
+            new MetadataException("Failed to fetch schema because of unrecognized data"));
+      }
+    } catch (IOException e) {
+      // Totally memory operation. This case won't happen.
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
new file mode 100644
index 0000000000..09868e6b9a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ClusterSchemaFetcher implements ISchemaFetcher {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private final Coordinator coordinator = Coordinator.getInstance();
+  private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
+  private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
+
+  private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
+      new AutoCreateSchemaExecutor(
+          templateManager,
+          statement -> {
+            long queryId = SessionManager.getInstance().requestQueryId();
+            return coordinator.execute(
+                statement,
+                queryId,
+                null,
+                "",
+                ClusterPartitionFetcher.getInstance(),
+                this,
+                config.getQueryTimeoutThreshold());
+          });
+  private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
+      new ClusterSchemaFetchExecutor(
+          coordinator,
+          templateManager,
+          () -> SessionManager.getInstance().requestQueryId(),
+          (queryId, statement) ->
+              coordinator.execute(
+                  statement,
+                  queryId,
+                  null,
+                  "",
+                  ClusterPartitionFetcher.getInstance(),
+                  this,
+                  config.getQueryTimeoutThreshold()),
+          schemaCache::put);
+
+  private static final class ClusterSchemaFetcherHolder {
+    private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
+
+    private ClusterSchemaFetcherHolder() {}
+  }
+
+  public static ClusterSchemaFetcher getInstance() {
+    return ClusterSchemaFetcherHolder.INSTANCE;
+  }
+
+  private ClusterSchemaFetcher() {}
+
+  @Override
+  public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
+    patternTree.constructTree();
+    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+    List<PartialPath> fullPathList = new ArrayList<>();
+    for (PartialPath pattern : pathPatternList) {
+      if (!pattern.hasWildcard()) {
+        fullPathList.add(pattern);
+      }
+    }
+
+    if (fullPathList.size() < pathPatternList.size()) {
+      return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, false);
+    }
+
+    // The schema cache R/W and fetch operation must be locked together thus the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    try {
+      ClusterSchemaTree schemaTree;
+      if (fullPathList.size() == pathPatternList.size()) {
+        boolean isAllCached = true;
+        schemaTree = new ClusterSchemaTree();
+        ClusterSchemaTree cachedSchema;
+        Set<String> storageGroupSet = new HashSet<>();
+        for (PartialPath fullPath : fullPathList) {
+          cachedSchema = schemaCache.get(fullPath);
+          if (cachedSchema.isEmpty()) {
+            isAllCached = false;
+            break;
+          } else {
+            schemaTree.mergeSchemaTree(cachedSchema);
+            storageGroupSet.addAll(cachedSchema.getDatabases());
+          }
+        }
+        if (isAllCached) {
+          schemaTree.setDatabases(storageGroupSet);
+          return schemaTree;
+        }
+      }
+
+      return clusterSchemaFetchExecutor.fetchSchemaOfPreciseMatch(fullPathList, patternTree);
+    } finally {
+      schemaCache.releaseReadLock();
+    }
+  }
+
+  @Override
+  public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+    patternTree.constructTree();
+    return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, true);
+  }
+
+  @Override
+  public ISchemaTree fetchSchemaWithAutoCreate(
+      PartialPath devicePath,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      boolean isAligned) {
+    // The schema cache R/W and fetch operation must be locked together thus the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    try {
+      ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
+      List<Integer> indexOfMissingMeasurements =
+          checkMissingMeasurements(schemaTree, devicePath, measurements);
+
+      // all schema can be taken from cache
+      if (indexOfMissingMeasurements.isEmpty()) {
+        return schemaTree;
+      }
+
+      // try fetch the missing schema from remote and cache fetched schema
+      ClusterSchemaTree remoteSchemaTree =
+          clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+              devicePath, measurements, indexOfMissingMeasurements);
+      if (!remoteSchemaTree.isEmpty()) {
+        schemaTree.mergeSchemaTree(remoteSchemaTree);
+      }
+
+      if (!config.isAutoCreateSchemaEnabled()) {
+        return schemaTree;
+      }
+
+      // auto create the still missing schema and merge them into schemaTree
+      indexOfMissingMeasurements =
+          checkMissingMeasurementsAfterSchemaFetch(
+              schemaTree, devicePath, indexOfMissingMeasurements, measurements);
+      if (!indexOfMissingMeasurements.isEmpty()) {
+        autoCreateSchemaExecutor.autoCreateMissingMeasurements(
+            schemaTree,
+            devicePath,
+            indexOfMissingMeasurements,
+            measurements,
+            getDataType,
+            isAligned);
+      }
+
+      return schemaTree;
+    } finally {
+      schemaCache.releaseReadLock();
+    }
+  }
+
+  @Override
+  public ISchemaTree fetchSchemaListWithAutoCreate(
+      List<PartialPath> devicePathList,
+      List<String[]> measurementsList,
+      List<TSDataType[]> tsDataTypesList,
+      List<Boolean> isAlignedList) {
+    return fetchSchemaListWithAutoCreate(
+        devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList);
+  }
+
+  @Override
+  public ISchemaTree fetchSchemaListWithAutoCreate(
+      List<PartialPath> devicePathList,
+      List<String[]> measurementsList,
+      List<TSDataType[]> tsDataTypesList,
+      List<TSEncoding[]> encodingsList,
+      List<CompressionType[]> compressionTypesList,
+      List<Boolean> isAlignedList) {
+    // The schema cache R/W and fetch operation must be locked together thus the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    try {
+      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+      List<List<Integer>> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size());
+      List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
+      for (int i = 0; i < devicePathList.size(); i++) {
+        schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
+        List<Integer> indexOfMissingMeasurements =
+            checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i));
+        if (!indexOfMissingMeasurements.isEmpty()) {
+          indexOfDevicesWithMissingMeasurements.add(i);
+          indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+        }
+      }
+
+      // all schema can be taken from cache
+      if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
+        return schemaTree;
+      }
+
+      // try fetch the missing schema from remote and cache fetched schema
+      ClusterSchemaTree remoteSchemaTree =
+          clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
+              devicePathList,
+              measurementsList,
+              indexOfDevicesWithMissingMeasurements,
+              indexOfMissingMeasurementsList);
+      if (!remoteSchemaTree.isEmpty()) {
+        schemaTree.mergeSchemaTree(remoteSchemaTree);
+      }
+
+      if (!config.isAutoCreateSchemaEnabled()) {
+        return schemaTree;
+      }
+
+      // auto create the still missing schema and merge them into schemaTree
+      List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
+      List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
+      List<Integer> indexOfMissingMeasurements;
+      int deviceIndex;
+      for (int i = 0, size = indexOfDevicesWithMissingMeasurements.size(); i < size; i++) {
+        deviceIndex = indexOfDevicesWithMissingMeasurements.get(i);
+        indexOfMissingMeasurements = indexOfMissingMeasurementsList.get(i);
+        indexOfMissingMeasurements =
+            checkMissingMeasurementsAfterSchemaFetch(
+                schemaTree,
+                devicePathList.get(deviceIndex),
+                indexOfMissingMeasurements,
+                measurementsList.get(deviceIndex));
+        if (!indexOfMissingMeasurements.isEmpty()) {
+          indexOfDevicesNeedAutoCreateSchema.add(deviceIndex);
+          indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
+        }
+      }
+
+      if (!indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
+        autoCreateSchemaExecutor.autoCreateMissingMeasurements(
+            schemaTree,
+            devicePathList,
+            indexOfDevicesNeedAutoCreateSchema,
+            indexOfMeasurementsNeedAutoCreate,
+            measurementsList,
+            tsDataTypesList,
+            encodingsList,
+            compressionTypesList,
+            isAlignedList);
+      }
+
+      return schemaTree;
+    } finally {
+      schemaCache.releaseReadLock();
+    }
+  }
+
+  @Override
+  public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+    return templateManager.checkTemplateSetInfo(path);
+  }
+
+  @Override
+  public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) {
+    return templateManager.checkAllRelatedTemplate(pathPattern);
+  }
+
+  @Override
+  public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
+    return templateManager.getAllPathsSetTemplate(templateName);
+  }
+
+  private List<Integer> checkMissingMeasurements(
+      ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+    if (deviceSchemaInfo == null) {
+      return IntStream.range(0, measurements.length).boxed().collect(Collectors.toList());
+    }
+
+    List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+    for (int i = 0; i < measurements.length; i++) {
+      if (schemaList.get(i) == null) {
+        indexOfMissingMeasurements.add(i);
+      }
+    }
+
+    return indexOfMissingMeasurements;
+  }
+
+  private List<Integer> checkMissingMeasurementsAfterSchemaFetch(
+      ClusterSchemaTree schemaTree,
+      PartialPath devicePath,
+      List<Integer> indexOfTargetMeasurements,
+      String[] measurements) {
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(
+            devicePath,
+            indexOfTargetMeasurements.stream()
+                .map(index -> measurements[index])
+                .collect(Collectors.toList()));
+    if (deviceSchemaInfo == null) {
+      return indexOfTargetMeasurements;
+    }
+
+    List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+    for (int i = 0, size = schemaList.size(); i < size; i++) {
+      if (schemaList.get(i) == null) {
+        indexOfMissingMeasurements.add(indexOfTargetMeasurements.get(i));
+      }
+    }
+
+    return indexOfMissingMeasurements;
+  }
+
+  @Override
+  public void invalidAllCache() {
+    DataNodeSchemaCache.getInstance().cleanUp();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 5ec75fe1da..37516321c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 391aed20dc..31b7472b44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
index 7e88374da7..58d3a5369c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
@@ -150,4 +150,7 @@ public enum StatementType {
   SHOW_TRIGGERS,
 
   DEACTIVATE_TEMPLATE,
+
+  INTERNAL_BATCH_ACTIVATE_TEMPLATE,
+  INTERNAL_CREATE_MULTI_TIMESERIES
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index e7107c06df..6a399b6c70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -35,8 +35,8 @@ import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
 import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
 import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index cbbc9d4131..bfa4e3d0ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner;
 
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
@@ -29,6 +31,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSe
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
@@ -49,6 +53,8 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -65,8 +71,10 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -339,6 +347,15 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
         internalCreateTimeSeriesStatement.isAligned());
   }
 
+  @Override
+  public PlanNode visitInternalCreateMultiTimeSeries(
+      InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement,
+      MPPQueryContext context) {
+    return new InternalCreateMultiTimeSeriesNode(
+        context.getQueryId().genPlanNodeId(),
+        internalCreateMultiTimeSeriesStatement.getDeviceMap());
+  }
+
   @Override
   public PlanNode visitCreateMultiTimeseries(
       CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
@@ -689,6 +706,21 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
         analysis.getTemplateSetInfo().left.getId());
   }
 
+  @Override
+  public PlanNode visitInternalBatchActivateTemplate(
+      InternalBatchActivateTemplateStatement internalBatchActivateTemplateStatement,
+      MPPQueryContext context) {
+    Map<PartialPath, Pair<Integer, Integer>> templateActivationMap = new HashMap<>();
+    for (Map.Entry<PartialPath, Pair<Template, PartialPath>> entry :
+        internalBatchActivateTemplateStatement.getDeviceMap().entrySet()) {
+      templateActivationMap.put(
+          entry.getKey(),
+          new Pair<>(entry.getValue().left.getId(), entry.getValue().right.getNodeLength() - 1));
+    }
+    return new InternalBatchActivateTemplateNode(
+        context.getQueryId().genPlanNodeId(), templateActivationMap);
+  }
+
   @Override
   public PlanNode visitShowPathsUsingTemplate(
       ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 0f846ad25b..5529e3cdf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMulti
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InvalidateSchemaCacheNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
@@ -152,7 +154,9 @@ public enum PlanNodeType {
   DEACTIVATE_TEMPLATE_NODE((short) 61),
   INTO((short) 62),
   DEVICE_VIEW_INTO((short) 63),
-  VERTICALLY_CONCAT((short) 64);
+  VERTICALLY_CONCAT((short) 64),
+  INTERNAL_BATCH_ACTIVATE_TEMPLATE((short) 68),
+  INTERNAL_CREATE_MULTI_TIMESERIES((short) 69);
 
   public static final int BYTES = Short.BYTES;
 
@@ -331,6 +335,10 @@ public enum PlanNodeType {
         return DeviceViewIntoNode.deserialize(buffer);
       case 64:
         return VerticallyConcatNode.deserialize(buffer);
+      case 68:
+        return InternalBatchActivateTemplateNode.deserialize(buffer);
+      case 69:
+        return InternalCreateMultiTimeSeriesNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8bf41c86ec..86f8cbf946 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -41,6 +41,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMulti
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackPreDeactivateTemplateNode;
@@ -331,4 +333,12 @@ public abstract class PlanVisitor<R, C> {
   public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitInternalBatchActivateTemplate(InternalBatchActivateTemplateNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitInternalCreateMultiTimeSeries(InternalCreateMultiTimeSeriesNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
new file mode 100644
index 0000000000..d705ba08fa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalBatchActivateTemplateNode extends WritePlanNode {
+
+  // devicePath -> <templateId, templateSetLevel>
+  private Map<PartialPath, Pair<Integer, Integer>> templateActivationMap;
+
+  private TRegionReplicaSet regionReplicaSet;
+
+  public InternalBatchActivateTemplateNode(
+      PlanNodeId id, Map<PartialPath, Pair<Integer, Integer>> templateActivationMap) {
+    super(id);
+    this.templateActivationMap = templateActivationMap;
+  }
+
+  private InternalBatchActivateTemplateNode(
+      PlanNodeId id,
+      Map<PartialPath, Pair<Integer, Integer>> templateActivationMap,
+      TRegionReplicaSet regionReplicaSet) {
+    super(id);
+    this.templateActivationMap = templateActivationMap;
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  public Map<PartialPath, Pair<Integer, Integer>> getTemplateActivationMap() {
+    return templateActivationMap;
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {}
+
+  @Override
+  public PlanNode clone() {
+    return new InternalBatchActivateTemplateNode(
+        getPlanNodeId(), templateActivationMap, regionReplicaSet);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return 0;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.INTERNAL_BATCH_ACTIVATE_TEMPLATE.serialize(byteBuffer);
+
+    int size = templateActivationMap.size();
+    ReadWriteIOUtils.write(size, byteBuffer);
+    for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : templateActivationMap.entrySet()) {
+      entry.getKey().serialize(byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue().left, byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue().right, byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.INTERNAL_BATCH_ACTIVATE_TEMPLATE.serialize(stream);
+
+    int size = templateActivationMap.size();
+    ReadWriteIOUtils.write(size, stream);
+    for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : templateActivationMap.entrySet()) {
+      entry.getKey().serialize(stream);
+      ReadWriteIOUtils.write(entry.getValue().left, stream);
+      ReadWriteIOUtils.write(entry.getValue().right, stream);
+    }
+  }
+
+  public static InternalBatchActivateTemplateNode deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<PartialPath, Pair<Integer, Integer>> templateActivationMap = new HashMap<>(size);
+    for (int i = 0; i < size; i++) {
+      templateActivationMap.put(
+          (PartialPath) PathDeserializeUtil.deserialize(byteBuffer),
+          new Pair<>(ReadWriteIOUtils.readInt(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer)));
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new InternalBatchActivateTemplateNode(planNodeId, templateActivationMap);
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    // gather devices to same target region
+    Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap = new HashMap<>();
+    for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : templateActivationMap.entrySet()) {
+      TRegionReplicaSet regionReplicaSet =
+          analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getFullPath());
+      splitMap
+          .computeIfAbsent(regionReplicaSet, k -> new HashMap<>())
+          .put(entry.getKey(), entry.getValue());
+    }
+
+    List<WritePlanNode> result = new ArrayList<>();
+    for (Map.Entry<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> entry :
+        splitMap.entrySet()) {
+      result.add(
+          new InternalBatchActivateTemplateNode(getPlanNodeId(), entry.getValue(), entry.getKey()));
+    }
+
+    return result;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitInternalBatchActivateTemplate(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
new file mode 100644
index 0000000000..40cd224e2f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalCreateMultiTimeSeriesNode extends WritePlanNode {
+
+  private Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap;
+
+  private TRegionReplicaSet regionReplicaSet;
+
+  public InternalCreateMultiTimeSeriesNode(
+      PlanNodeId id, Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap) {
+    super(id);
+    this.deviceMap = deviceMap;
+  }
+
+  private InternalCreateMultiTimeSeriesNode(
+      PlanNodeId id,
+      Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap,
+      TRegionReplicaSet regionReplicaSet) {
+    super(id);
+    this.deviceMap = deviceMap;
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  public Map<PartialPath, Pair<Boolean, MeasurementGroup>> getDeviceMap() {
+    return deviceMap;
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {}
+
+  @Override
+  public PlanNode clone() {
+    return new InternalCreateMultiTimeSeriesNode(getPlanNodeId(), deviceMap, regionReplicaSet);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return 0;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.INTERNAL_CREATE_MULTI_TIMESERIES.serialize(byteBuffer);
+
+    ReadWriteIOUtils.write(deviceMap.size(), byteBuffer);
+    for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry : deviceMap.entrySet()) {
+      entry.getKey().serialize(byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue().left, byteBuffer);
+      entry.getValue().right.serialize(byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.INTERNAL_CREATE_MULTI_TIMESERIES.serialize(stream);
+
+    ReadWriteIOUtils.write(deviceMap.size(), stream);
+    for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry : deviceMap.entrySet()) {
+      entry.getKey().serialize(stream);
+      ReadWriteIOUtils.write(entry.getValue().left, stream);
+      entry.getValue().right.serialize(stream);
+    }
+  }
+
+  public static InternalCreateMultiTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap = new HashMap<>(size);
+    PartialPath devicePath;
+    boolean isAligned;
+    MeasurementGroup measurementGroup;
+    for (int i = 0; i < size; i++) {
+      devicePath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+      isAligned = ReadWriteIOUtils.readBool(byteBuffer);
+      measurementGroup = new MeasurementGroup();
+      measurementGroup.deserialize(byteBuffer);
+      deviceMap.put(devicePath, new Pair<>(isAligned, measurementGroup));
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new InternalCreateMultiTimeSeriesNode(planNodeId, deviceMap);
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    // gather devices to same target region
+    Map<TRegionReplicaSet, Map<PartialPath, Pair<Boolean, MeasurementGroup>>> splitMap =
+        new HashMap<>();
+    for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry : deviceMap.entrySet()) {
+      TRegionReplicaSet regionReplicaSet =
+          analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getFullPath());
+      splitMap
+          .computeIfAbsent(regionReplicaSet, k -> new HashMap<>())
+          .put(entry.getKey(), entry.getValue());
+    }
+
+    List<WritePlanNode> result = new ArrayList<>();
+    for (Map.Entry<TRegionReplicaSet, Map<PartialPath, Pair<Boolean, MeasurementGroup>>> entry :
+        splitMap.entrySet()) {
+      result.add(
+          new InternalCreateMultiTimeSeriesNode(getPlanNodeId(), entry.getValue(), entry.getKey()));
+    }
+
+    return result;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitInternalCreateMultiTimeSeries(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index ebceb0e10b..cf7dc610d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 4d55216dfe..0d433a5901 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -450,4 +452,14 @@ public abstract class StatementVisitor<R, C> {
       DropSchemaTemplateStatement dropSchemaTemplateStatement, C context) {
     return visitStatement(dropSchemaTemplateStatement, context);
   }
+
+  public R visitInternalBatchActivateTemplate(
+      InternalBatchActivateTemplateStatement internalBatchActivateTemplateStatement, C context) {
+    return visitStatement(internalBatchActivateTemplateStatement, context);
+  }
+
+  public R visitInternalCreateMultiTimeSeries(
+      InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement, C context) {
+    return visitStatement(internalCreateMultiTimeSeriesStatement, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalBatchActivateTemplateStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalBatchActivateTemplateStatement.java
new file mode 100644
index 0000000000..8e8008c68d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalBatchActivateTemplateStatement.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.internal;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+// This is only used for auto activate template on multi devices while inserting data
+public class InternalBatchActivateTemplateStatement extends Statement {
+
+  // devicePath -> <Template, TemplateSetPath>
+  private Map<PartialPath, Pair<Template, PartialPath>> deviceMap;
+
+  public InternalBatchActivateTemplateStatement(
+      Map<PartialPath, Pair<Template, PartialPath>> deviceMap) {
+    super();
+    setType(StatementType.INTERNAL_BATCH_ACTIVATE_TEMPLATE);
+    this.deviceMap = deviceMap;
+  }
+
+  public Map<PartialPath, Pair<Template, PartialPath>> getDeviceMap() {
+    return deviceMap;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return new ArrayList<>(deviceMap.keySet());
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitInternalBatchActivateTemplate(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalCreateMultiTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalCreateMultiTimeSeriesStatement.java
new file mode 100644
index 0000000000..828b431cfe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalCreateMultiTimeSeriesStatement.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.internal;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class InternalCreateMultiTimeSeriesStatement extends Statement {
+
+  private Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap;
+
+  public InternalCreateMultiTimeSeriesStatement(
+      Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap) {
+    super();
+    statementType = StatementType.INTERNAL_CREATE_MULTI_TIMESERIES;
+    this.deviceMap = deviceMap;
+  }
+
+  public Map<PartialPath, Pair<Boolean, MeasurementGroup>> getDeviceMap() {
+    return deviceMap;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return new ArrayList<>(deviceMap.keySet());
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitInternalCreateMultiTimeSeries(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index cecee37dfb..6a30d70799 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
index 4641456bdc..d927d18042 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
index 4892e13da7..1c29a6ee52 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
index ce21ce89fd..648c3f5903 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
@@ -27,11 +27,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index daf3de06d5..a52a9e4983 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -36,11 +36,11 @@ import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 80bf535a27..bf0a312c8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -77,11 +77,11 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 3706fbfd99..dad1f162a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.qp.utils.DateTimeUtils;
 import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
index 754893d562..1d6509191b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 
 /**
  * This interface is used to load files, including tsFile, syncTask, schema, modsFile and
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index 2f05dddf14..8ae5dba86d 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index f468e09d9b..89430bc91f 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index abaff37ba7..ffb3902c9c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 61951feee4..a65b83d03a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index a4a3c6c580..7c908b2f4d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -46,7 +46,7 @@ import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;