You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/15 05:03:13 UTC

[iotdb] branch master updated: [IOTDB-3172] Support CreateMultTimeseries API for new cluster (#5896)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3857fab2b1 [IOTDB-3172] Support CreateMultTimeseries API for new cluster (#5896)
3857fab2b1 is described below

commit 3857fab2b13f0af282032b8f28cc2035c3f50a27
Author: Haonan <hh...@outlook.com>
AuthorDate: Sun May 15 13:03:07 2022 +0800

    [IOTDB-3172] Support CreateMultTimeseries API for new cluster (#5896)
---
 .../metadata/visitor/SchemaExecutionVisitor.java   |  50 +++
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  15 +
 .../db/mpp/plan/parser/StatementGenerator.java     |  33 ++
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  17 +
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../metedata/write/CreateMultiTimeSeriesNode.java  | 453 +++++++++++++++++++++
 .../db/mpp/plan/statement/StatementVisitor.java    |   7 +
 .../metadata/CreateMultiTimeSeriesStatement.java   | 143 +++++++
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  43 +-
 .../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java |  88 ++++
 .../iotdb/db/service/InternalServiceImplTest.java  | 100 +++++
 12 files changed, 957 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 9988e8b8da..b6b55c731b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -30,9 +30,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -42,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 /** Schema write PlanNode visitor */
 public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
@@ -72,6 +75,39 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
   }
 
+  @Override
+  public TSStatus visitCreateMultiTimeSeries(
+      CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
+    CreateMultiTimeSeriesPlan multiPlan =
+        (CreateMultiTimeSeriesPlan)
+            node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+    for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+      if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+        continue;
+      }
+      CreateTimeSeriesPlan plan =
+          new CreateTimeSeriesPlan(
+              multiPlan.getPaths().get(i),
+              multiPlan.getDataTypes().get(i),
+              multiPlan.getEncodings().get(i),
+              multiPlan.getCompressors().get(i),
+              multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+              multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+              multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+              multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+      try {
+        schemaRegion.createTimeseries(plan, -1);
+      } catch (MetadataException e) {
+        logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+        multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!multiPlan.getResults().isEmpty()) {
+      return RpcUtils.getStatus(Arrays.asList(multiPlan.getFailingStatus()));
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+  }
+
   @Override
   public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
     try {
@@ -146,6 +182,20 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
           node.getTagsList(),
           node.getAttributesList());
     }
+
+    public PhysicalPlan visitCreateMultiTimeSeries(
+        CreateMultiTimeSeriesNode node, TransformerContext context) {
+      CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
+      multiPlan.setPaths(node.getPaths());
+      multiPlan.setDataTypes(node.getDataTypes());
+      multiPlan.setEncodings(node.getEncodings());
+      multiPlan.setCompressors(node.getCompressors());
+      multiPlan.setProps(node.getPropsList());
+      multiPlan.setAlias(node.getAliasList());
+      multiPlan.setTags(node.getTagsList());
+      multiPlan.setAttributes(node.getAttributesList());
+      return multiPlan;
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 9522387207..fd6053fd20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -854,6 +855,20 @@ public class Analyzer {
       return analysis;
     }
 
+    @Override
+    public Analysis visitCreateMultiTimeseries(
+        CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
+      Analysis analysis = new Analysis();
+      analysis.setStatement(createMultiTimeSeriesStatement);
+
+      SchemaPartition schemaPartitionInfo =
+          partitionFetcher.getOrCreateSchemaPartition(
+              new PathPatternTree(createMultiTimeSeriesStatement.getPaths()));
+      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+      return analysis;
+    }
+
     @Override
     public Analysis visitAlterTimeseries(
         AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 51726de6ca..fdd5f36f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.qp.strategy.SQLParseError;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -373,6 +375,37 @@ public class StatementGenerator {
     return statement;
   }
 
+  public static Statement createStatement(TSCreateMultiTimeseriesReq req)
+      throws IllegalPathException {
+    // construct create multi timeseries statement
+    List<PartialPath> paths = new ArrayList<>();
+    for (String path : req.paths) {
+      paths.add(new PartialPath(path));
+    }
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (int dataType : req.dataTypes) {
+      dataTypes.add(TSDataType.values()[dataType]);
+    }
+    List<TSEncoding> encodings = new ArrayList<>();
+    for (int encoding : req.encodings) {
+      encodings.add(TSEncoding.values()[encoding]);
+    }
+    List<CompressionType> compressors = new ArrayList<>();
+    for (int compressor : req.compressors) {
+      compressors.add(CompressionType.values()[compressor]);
+    }
+    CreateMultiTimeSeriesStatement statement = new CreateMultiTimeSeriesStatement();
+    statement.setPaths(paths);
+    statement.setDataTypes(dataTypes);
+    statement.setEncodings(encodings);
+    statement.setCompressors(compressors);
+    statement.setPropsList(req.propsList);
+    statement.setTagsList(req.tagsList);
+    statement.setAttributesList(req.attributesList);
+    statement.setAliasList(req.measurementAliasList);
+    return statement;
+  }
+
   private static Statement invokeParser(String sql, ZoneId zoneId) {
     ASTVisitor astVisitor = new ASTVisitor();
     astVisitor.setZoneId(zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index befcdb71f4..57d0854b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -281,6 +283,21 @@ public class LogicalPlanner {
           createAlignedTimeSeriesStatement.getAttributesList());
     }
 
+    @Override
+    public PlanNode visitCreateMultiTimeseries(
+        CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
+      return new CreateMultiTimeSeriesNode(
+          context.getQueryId().genPlanNodeId(),
+          createMultiTimeSeriesStatement.getPaths(),
+          createMultiTimeSeriesStatement.getDataTypes(),
+          createMultiTimeSeriesStatement.getEncodings(),
+          createMultiTimeSeriesStatement.getCompressors(),
+          createMultiTimeSeriesStatement.getPropsList(),
+          createMultiTimeSeriesStatement.getAliasList(),
+          createMultiTimeSeriesStatement.getTagsList(),
+          createMultiTimeSeriesStatement.getAttributesList());
+    }
+
     @Override
     public PlanNode visitAlterTimeseries(
         AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 1d52cd0231..4fe62bad82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -99,7 +100,8 @@ public enum PlanNodeType {
   DEVICE_MERGE((short) 35),
   SCHEMA_FETCH_MERGE((short) 36),
   TRANSFORM((short) 37),
-  DELETE_REGION((short) 38);
+  DELETE_REGION((short) 38),
+  CREATE_MULTI_TIME_SERIES((short) 39);
 
   private final short nodeType;
 
@@ -201,6 +203,8 @@ public enum PlanNodeType {
         return TransformNode.deserialize(buffer);
       case 38:
         return DeleteRegionNode.deserialize(buffer);
+      case 39:
+        return CreateMultiTimeSeriesNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 959cfa3a62..b11dac5a26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
@@ -184,6 +185,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitCreateMultiTimeSeries(CreateMultiTimeSeriesNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
new file mode 100644
index 0000000000..52fa45a411
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateMultiTimeSeriesNode extends WritePlanNode {
+  private List<PartialPath> paths = new ArrayList<>();
+  private List<TSDataType> dataTypes = new ArrayList<>();
+  private List<TSEncoding> encodings = new ArrayList<>();
+  private List<CompressionType> compressors = new ArrayList<>();
+  private List<String> aliasList;
+  private List<Map<String, String>> propsList;
+  private List<Map<String, String>> tagsList;
+  private List<Map<String, String>> attributesList;
+  private List<Long> tagOffsets;
+  private TRegionReplicaSet regionReplicaSet;
+
+  public CreateMultiTimeSeriesNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public CreateMultiTimeSeriesNode(
+      PlanNodeId id,
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<Map<String, String>> propsList,
+      List<String> aliasList,
+      List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList) {
+    super(id);
+    this.paths = paths;
+    this.dataTypes = dataTypes;
+    this.encodings = encodings;
+    this.compressors = compressors;
+    this.propsList = propsList;
+    this.aliasList = aliasList;
+    this.tagsList = tagsList;
+    this.attributesList = attributesList;
+  }
+
+  public List<PartialPath> getPaths() {
+    return paths;
+  }
+
+  public void setPaths(List<PartialPath> paths) {
+    this.paths = paths;
+  }
+
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public List<TSEncoding> getEncodings() {
+    return encodings;
+  }
+
+  public void setEncodings(List<TSEncoding> encodings) {
+    this.encodings = encodings;
+  }
+
+  public List<CompressionType> getCompressors() {
+    return compressors;
+  }
+
+  public void setCompressors(List<CompressionType> compressors) {
+    this.compressors = compressors;
+  }
+
+  public List<Map<String, String>> getPropsList() {
+    return propsList;
+  }
+
+  public void setPropsList(List<Map<String, String>> propsList) {
+    this.propsList = propsList;
+  }
+
+  public List<String> getAliasList() {
+    return aliasList;
+  }
+
+  public void setAliasList(List<String> aliasList) {
+    this.aliasList = aliasList;
+  }
+
+  public List<Map<String, String>> getTagsList() {
+    return tagsList;
+  }
+
+  public void setTagsList(List<Map<String, String>> tagsList) {
+    this.tagsList = tagsList;
+  }
+
+  public List<Map<String, String>> getAttributesList() {
+    return attributesList;
+  }
+
+  public void setAttributesList(List<Map<String, String>> attributesList) {
+    this.attributesList = attributesList;
+  }
+
+  public List<Long> getTagOffsets() {
+    return tagOffsets;
+  }
+
+  public void setTagOffsets(List<Long> tagOffsets) {
+    this.tagOffsets = tagOffsets;
+  }
+
+  public void addTimeSeries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes) {
+    this.paths.add(path);
+    this.dataTypes.add(dataType);
+    this.encodings.add(encoding);
+    this.compressors.add(compressor);
+    if (props != null) {
+      if (this.propsList == null) {
+        propsList = new ArrayList<>();
+      }
+      propsList.add(props);
+    }
+    if (alias != null) {
+      if (this.aliasList == null) {
+        aliasList = new ArrayList<>();
+      }
+      aliasList.add(alias);
+    }
+    if (tags != null) {
+      if (this.tagsList == null) {
+        tagsList = new ArrayList<>();
+      }
+      tagsList.add(tags);
+    }
+    if (attributes != null) {
+      if (this.attributesList == null) {
+        attributesList = new ArrayList<>();
+      }
+      attributesList.add(attributes);
+    }
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {}
+
+  @Override
+  public PlanNode clone() {
+    throw new NotImplementedException("Clone of CreateMultiTimeSeriesNode is not implemented");
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+    return visitor.visitCreateMultiTimeSeries(this, schemaRegion);
+  }
+
+  public static CreateMultiTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
+    String id;
+    List<PartialPath> paths;
+    List<TSDataType> dataTypes;
+    List<TSEncoding> encodings;
+    List<CompressionType> compressors;
+    List<String> aliasList = null;
+    List<Map<String, String>> propsList = null;
+    List<Map<String, String>> tagsList = null;
+    List<Map<String, String>> attributesList = null;
+
+    int size = byteBuffer.getInt();
+    paths = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      try {
+        paths.add(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+      } catch (IllegalPathException e) {
+        throw new IllegalArgumentException("Can not deserialize CreateMultiTimeSeriesNode", e);
+      }
+    }
+
+    dataTypes = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      dataTypes.add(TSDataType.values()[byteBuffer.get()]);
+    }
+
+    encodings = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      encodings.add(TSEncoding.values()[byteBuffer.get()]);
+    }
+
+    compressors = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      compressors.add(CompressionType.values()[byteBuffer.get()]);
+    }
+
+    byte label = byteBuffer.get();
+    if (label >= 0) {
+      aliasList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          aliasList.add(ReadWriteIOUtils.readString(byteBuffer));
+        }
+      }
+    }
+
+    label = byteBuffer.get();
+    if (label >= 0) {
+      propsList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          propsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+        }
+      }
+    }
+
+    label = byteBuffer.get();
+    if (label >= 0) {
+      tagsList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          tagsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+        }
+      }
+    }
+
+    label = byteBuffer.get();
+    if (label >= 0) {
+      attributesList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          attributesList.add(ReadWriteIOUtils.readMap(byteBuffer));
+        }
+      }
+    }
+
+    id = ReadWriteIOUtils.readString(byteBuffer);
+
+    return new CreateMultiTimeSeriesNode(
+        new PlanNodeId(id),
+        paths,
+        dataTypes,
+        encodings,
+        compressors,
+        propsList,
+        aliasList,
+        tagsList,
+        attributesList);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CreateMultiTimeSeriesNode that = (CreateMultiTimeSeriesNode) o;
+    return this.getPlanNodeId().equals(that.getPlanNodeId())
+        && Objects.equals(paths, that.paths)
+        && Objects.equals(dataTypes, that.dataTypes)
+        && Objects.equals(encodings, that.encodings)
+        && Objects.equals(compressors, that.compressors)
+        && Objects.equals(propsList, that.propsList)
+        && Objects.equals(tagOffsets, that.tagOffsets)
+        && Objects.equals(aliasList, that.aliasList)
+        && Objects.equals(tagsList, that.tagsList)
+        && Objects.equals(attributesList, that.attributesList);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.CREATE_MULTI_TIME_SERIES.serialize(byteBuffer);
+
+    // paths
+    byteBuffer.putInt(paths.size());
+    for (PartialPath path : paths) {
+      ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+    }
+
+    // dataTypes
+    for (TSDataType dataType : dataTypes) {
+      byteBuffer.put((byte) dataType.ordinal());
+    }
+
+    // encodings
+    for (TSEncoding encoding : encodings) {
+      byteBuffer.put((byte) encoding.ordinal());
+    }
+
+    // compressors
+    for (CompressionType compressor : compressors) {
+      byteBuffer.put((byte) compressor.ordinal());
+    }
+
+    // alias
+    if (aliasList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (aliasList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (String alias : aliasList) {
+        ReadWriteIOUtils.write(alias, byteBuffer);
+      }
+    }
+
+    // props
+    if (propsList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (propsList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (Map<String, String> props : propsList) {
+        ReadWriteIOUtils.write(props, byteBuffer);
+      }
+    }
+
+    // tags
+    if (tagsList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (tagsList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (Map<String, String> tags : tagsList) {
+        ReadWriteIOUtils.write(tags, byteBuffer);
+      }
+    }
+
+    // attributes
+    if (attributesList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (attributesList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (Map<String, String> attributes : attributesList) {
+        ReadWriteIOUtils.write(attributes, byteBuffer);
+      }
+    }
+  }
+
+  public int hashCode() {
+    return Objects.hash(
+        this.getPlanNodeId(),
+        paths,
+        dataTypes,
+        encodings,
+        compressors,
+        tagOffsets,
+        aliasList,
+        tagsList,
+        attributesList);
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    Map<TRegionReplicaSet, CreateMultiTimeSeriesNode> splitMap = new HashMap<>();
+    for (int i = 0; i < paths.size(); i++) {
+      TRegionReplicaSet regionReplicaSet =
+          analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(paths.get(i).getDevice());
+      CreateMultiTimeSeriesNode tmpNode;
+      if (splitMap.containsKey(regionReplicaSet)) {
+        tmpNode = splitMap.get(regionReplicaSet);
+      } else {
+        tmpNode = new CreateMultiTimeSeriesNode(this.getPlanNodeId());
+        tmpNode.setRegionReplicaSet(regionReplicaSet);
+        splitMap.put(regionReplicaSet, tmpNode);
+      }
+      tmpNode.addTimeSeries(
+          paths.get(i),
+          dataTypes.get(i),
+          encodings.get(i),
+          compressors.get(i),
+          propsList == null ? null : propsList.get(i),
+          aliasList == null ? null : aliasList.get(i),
+          attributesList == null ? null : tagsList.get(i),
+          attributesList == null ? null : attributesList.get(i));
+    }
+    return new ArrayList<>(splitMap.values());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 77bd11ebd6..16bbdff8dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
@@ -78,6 +79,12 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(createAlignedTimeSeriesStatement, context);
   }
 
+  // Create Multi Timeseries
+  public R visitCreateMultiTimeseries(
+      CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, C context) {
+    return visitStatement(createMultiTimeSeriesStatement, context);
+  }
+
   // Alter Timeseries
   public R visitAlterTimeseries(AlterTimeSeriesStatement alterTimeSeriesStatement, C context) {
     return visitStatement(alterTimeSeriesStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
new file mode 100644
index 0000000000..d46dbc4214
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** CREATE MULTI TIMESERIES statement. */
+public class CreateMultiTimeSeriesStatement extends Statement {
+
+  private List<PartialPath> paths;
+  private List<TSDataType> dataTypes = new ArrayList<>();
+  private List<TSEncoding> encodings = new ArrayList<>();
+  private List<CompressionType> compressors = new ArrayList<>();
+  private List<Map<String, String>> propsList;
+  private List<String> aliasList;
+  private List<Map<String, String>> tagsList;
+  private List<Map<String, String>> attributesList;
+  private List<Long> tagOffsets;
+
+  public CreateMultiTimeSeriesStatement() {
+    super();
+    statementType = StatementType.CREATE_MULTI_TIMESERIES;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return paths;
+  }
+
+  public void setPaths(List<PartialPath> paths) {
+    this.paths = paths;
+  }
+
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public List<TSEncoding> getEncodings() {
+    return encodings;
+  }
+
+  public void setEncodings(List<TSEncoding> encodings) {
+    this.encodings = encodings;
+  }
+
+  public List<CompressionType> getCompressors() {
+    return compressors;
+  }
+
+  public void setCompressors(List<CompressionType> compressors) {
+    this.compressors = compressors;
+  }
+
+  public List<Map<String, String>> getPropsList() {
+    return propsList;
+  }
+
+  public void setPropsList(List<Map<String, String>> propsList) {
+    this.propsList = propsList;
+  }
+
+  public List<String> getAliasList() {
+    return aliasList;
+  }
+
+  public void setAliasList(List<String> aliasList) {
+    this.aliasList = aliasList;
+  }
+
+  public List<Map<String, String>> getTagsList() {
+    return tagsList;
+  }
+
+  public void setTagsList(List<Map<String, String>> tagsList) {
+    this.tagsList = tagsList;
+  }
+
+  public List<Map<String, String>> getAttributesList() {
+    return attributesList;
+  }
+
+  public void setAttributesList(List<Map<String, String>> attributesList) {
+    this.attributesList = attributesList;
+  }
+
+  public void addAttributesList(Map<String, String> attributes) {
+    this.attributesList.add(attributes);
+  }
+
+  public List<Long> getTagOffsets() {
+    if (tagOffsets == null) {
+      tagOffsets = new ArrayList<>();
+      for (int i = 0; i < paths.size(); i++) {
+        tagOffsets.add(Long.parseLong("-1"));
+      }
+    }
+    return tagOffsets;
+  }
+
+  public void setTagOffsets(List<Long> tagOffsets) {
+    this.tagOffsets = tagOffsets;
+  }
+
+  public void addTagOffsets(Long tagsOffset) {
+    this.tagOffsets.add(tagsOffset);
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitCreateMultiTimeseries(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index c9635fced0..f7f863a33c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
@@ -367,7 +368,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
             req.getMeasurements());
       }
 
-      // Step 1: transfer from CreateAlignedTimeSeriesStatement to Statement
+      // Step 1: transfer from CreateAlignedTimeSeriesReq to Statement
       CreateAlignedTimeSeriesStatement statement =
           (CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
 
@@ -397,7 +398,45 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
-    throw new UnsupportedOperationException();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session-{} create {} timeseries, the first is {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.getPaths().size(),
+            req.getPaths().get(0));
+      }
+
+      // Step 1: transfer from CreateMultiTimeSeriesReq to Statement
+      CreateMultiTimeSeriesStatement statement =
+          (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index dec4924081..a0b6261afb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -37,11 +37,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryM
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -187,6 +190,91 @@ public class LogicalPlannerTest {
     }
   }
 
+  @Test
+  public void testCreateMultiTimeSeriesPlan() {
+    try {
+      TSCreateMultiTimeseriesReq req = new TSCreateMultiTimeseriesReq();
+      req.setPaths(
+          new ArrayList<String>() {
+            {
+              add("root.sg1.d2.s1");
+              add("root.sg1.d2.s2");
+            }
+          });
+      req.setMeasurementAliasList(
+          new ArrayList<String>() {
+            {
+              add("meter1");
+              add(null);
+            }
+          });
+      req.setDataTypes(
+          new ArrayList<Integer>() {
+            {
+              add(TSDataType.FLOAT.ordinal());
+              add(TSDataType.FLOAT.ordinal());
+            }
+          });
+      req.setEncodings(
+          new ArrayList<Integer>() {
+            {
+              add(TSEncoding.PLAIN.ordinal());
+              add(TSEncoding.PLAIN.ordinal());
+            }
+          });
+      req.setCompressors(
+          new ArrayList<Integer>() {
+            {
+              add(CompressionType.SNAPPY.ordinal());
+              add(CompressionType.SNAPPY.ordinal());
+            }
+          });
+      req.setAttributesList(
+          new ArrayList<Map<String, String>>() {
+            {
+              add(
+                  new HashMap<String, String>() {
+                    {
+                      put("attr1", "a1");
+                    }
+                  });
+              add(null);
+            }
+          });
+      req.setTagsList(
+          new ArrayList<Map<String, String>>() {
+            {
+              add(
+                  new HashMap<String, String>() {
+                    {
+                      put("tag1", "t1");
+                    }
+                  });
+              add(null);
+            }
+          });
+      CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement =
+          (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
+      MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+      Analyzer analyzer =
+          new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
+      Analysis analysis = analyzer.analyze(createMultiTimeSeriesStatement);
+      LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+      CreateMultiTimeSeriesNode createMultiTimeSeriesNode =
+          (CreateMultiTimeSeriesNode) planner.plan(analysis).getRootNode();
+      // Test serialize and deserialize
+      ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+      createMultiTimeSeriesNode.serialize(byteBuffer);
+      byteBuffer.flip();
+      CreateMultiTimeSeriesNode createMultiTimeSeriesNode1 =
+          (CreateMultiTimeSeriesNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
+      Assert.assertEquals(createMultiTimeSeriesNode, createMultiTimeSeriesNode1);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
   @Test
   public void testAlterTimeseriesPlan() {
     String sql = "ALTER timeseries root.turbine.d1.s1 RENAME 'tag1' TO 'newTag1'";
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index b20fa5364d..3fafea7cfd 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -252,6 +253,105 @@ public class InternalServiceImplTest {
     Assert.assertTrue(response.accepted);
   }
 
+  @Test
+  public void testCreateMultiTimeSeries() throws MetadataException {
+    CreateMultiTimeSeriesNode createMultiTimeSeriesNode =
+        new CreateMultiTimeSeriesNode(
+            new PlanNodeId("0"),
+            new ArrayList<PartialPath>() {
+              {
+                add(new PartialPath("root.ln.d3.s1"));
+                add(new PartialPath("root.ln.d3.s2"));
+              }
+            },
+            new ArrayList<TSDataType>() {
+              {
+                add(TSDataType.FLOAT);
+                add(TSDataType.FLOAT);
+              }
+            },
+            new ArrayList<TSEncoding>() {
+              {
+                add(TSEncoding.PLAIN);
+                add(TSEncoding.PLAIN);
+              }
+            },
+            new ArrayList<CompressionType>() {
+              {
+                add(CompressionType.SNAPPY);
+                add(CompressionType.SNAPPY);
+              }
+            },
+            new ArrayList<Map<String, String>>() {
+              {
+                add(
+                    new HashMap<String, String>() {
+                      {
+                        put("MAX_POINT_NUMBER", "3");
+                      }
+                    });
+                add(null);
+              }
+            },
+            new ArrayList<String>() {
+              {
+                add("meter1");
+                add(null);
+              }
+            },
+            new ArrayList<Map<String, String>>() {
+              {
+                add(
+                    new HashMap<String, String>() {
+                      {
+                        put("tag1", "t1");
+                      }
+                    });
+                add(null);
+              }
+            },
+            new ArrayList<Map<String, String>>() {
+              {
+                add(
+                    new HashMap<String, String>() {
+                      {
+                        put("tag1", "t1");
+                      }
+                    });
+                add(null);
+              }
+            });
+
+    TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+    PlanFragment planFragment =
+        new PlanFragment(new PlanFragmentId("2", 3), createMultiTimeSeriesNode);
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(
+            planFragment,
+            planFragment.getId().genFragmentInstanceId(),
+            new GroupByFilter(1, 2, 3, 4),
+            QueryType.WRITE);
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+
+    // serialize fragmentInstance
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    fragmentInstance.serializeRequest(byteBuffer);
+    byteBuffer.flip();
+
+    // put serialized fragmentInstance to TSendFragmentInstanceReq
+    TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+    TFragmentInstance tFragmentInstance = new TFragmentInstance();
+    tFragmentInstance.setBody(byteBuffer);
+    request.setFragmentInstance(tFragmentInstance);
+    request.setConsensusGroupId(regionReplicaSet.getRegionId());
+    request.setQueryType(QueryType.WRITE.toString());
+
+    // Use consensus layer to execute request
+    TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+
+    Assert.assertTrue(response.accepted);
+  }
+
   private TRegionReplicaSet genRegionReplicaSet() {
     List<TDataNodeLocation> dataNodeList = new ArrayList<>();
     dataNodeList.add(