You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/13 07:55:17 UTC

[iotdb] 01/01: Support CreateMultiTimeseries in new cluster

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch multitimeseries
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79ef4a6e4ae5c4e8a7de82cc5b6a1c2318958510
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri May 13 15:54:54 2022 +0800

    Support CreateMultiTimeseries in new cluster
---
 .../metadata/visitor/SchemaExecutionVisitor.java   |  50 +++
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  15 +
 .../db/mpp/plan/parser/StatementGenerator.java     |  32 ++
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  16 +
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../metedata/write/CreateMultiTimeSeriesNode.java  | 409 +++++++++++++++++++++
 .../db/mpp/plan/statement/StatementVisitor.java    |   7 +
 .../metadata/CreateMultiTimeSeriesStatement.java   | 160 ++++++++
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  43 ++-
 10 files changed, 740 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 657f737b3e..6561eb6dee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -27,9 +27,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 /** Schema write PlanNode visitor */
 public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
@@ -69,6 +72,39 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
   }
 
+  @Override
+  public TSStatus visitCreateMultiTimeSeries(
+      CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
+    CreateMultiTimeSeriesPlan multiPlan =
+        (CreateMultiTimeSeriesPlan)
+            node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+    for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+      if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+        continue;
+      }
+      CreateTimeSeriesPlan plan =
+          new CreateTimeSeriesPlan(
+              multiPlan.getPaths().get(i),
+              multiPlan.getDataTypes().get(i),
+              multiPlan.getEncodings().get(i),
+              multiPlan.getCompressors().get(i),
+              multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+              multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+              multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+              multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+      try {
+        schemaRegion.createTimeseries(plan, -1);
+      } catch (MetadataException e) {
+        logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+        multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!multiPlan.getResults().isEmpty()) {
+      return RpcUtils.getStatus(Arrays.asList(multiPlan.getFailingStatus()));
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+  }
+
   @Override
   public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
     try {
@@ -143,6 +179,20 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
           node.getTagsList(),
           node.getAttributesList());
     }
+
+    public PhysicalPlan visitCreateMultiTimeSeries(
+        CreateMultiTimeSeriesNode node, TransformerContext context) {
+
+      CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
+      multiPlan.setPaths(node.getPaths());
+      multiPlan.setDataTypes(node.getDataTypes());
+      multiPlan.setEncodings(node.getEncodings());
+      multiPlan.setCompressors(node.getCompressors());
+      multiPlan.setAlias(node.getAliasList());
+      multiPlan.setTags(node.getTagsList());
+      multiPlan.setAttributes(node.getAttributesList());
+      return multiPlan;
+    }
   }
 
   private static class TransformerContext {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index f52ba6fd77..5c698d1ff0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -854,6 +855,20 @@ public class Analyzer {
       return analysis;
     }
 
+    @Override
+    public Analysis visitCreateMultiTimeseries(
+        CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
+      Analysis analysis = new Analysis();
+      analysis.setStatement(createMultiTimeSeriesStatement);
+
+      SchemaPartition schemaPartitionInfo =
+          partitionFetcher.getOrCreateSchemaPartition(
+              new PathPatternTree(createMultiTimeSeriesStatement.getPaths()));
+      analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+      return analysis;
+    }
+
     @Override
     public Analysis visitAlterTimeseries(
         AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 51726de6ca..a473299bb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.qp.strategy.SQLParseError;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -373,6 +375,36 @@ public class StatementGenerator {
     return statement;
   }
 
+  public static Statement createStatement(TSCreateMultiTimeseriesReq req)
+      throws IllegalPathException {
+    // construct create multi timeseries statement
+    CreateMultiTimeSeriesStatement statement = new CreateMultiTimeSeriesStatement();
+    List<PartialPath> paths = new ArrayList<>();
+    for (String path : req.paths) {
+      paths.add(new PartialPath(path));
+    }
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (int dataType : req.dataTypes) {
+      dataTypes.add(TSDataType.values()[dataType]);
+    }
+    List<TSEncoding> encodings = new ArrayList<>();
+    for (int encoding : req.encodings) {
+      encodings.add(TSEncoding.values()[encoding]);
+    }
+    List<CompressionType> compressors = new ArrayList<>();
+    for (int compressor : req.compressors) {
+      compressors.add(CompressionType.values()[compressor]);
+    }
+    statement.setPaths(paths);
+    statement.setDataTypes(dataTypes);
+    statement.setEncodings(encodings);
+    statement.setCompressors(compressors);
+    statement.setTagsList(req.tagsList);
+    statement.setAttributesList(req.attributesList);
+    statement.setAliasList(req.measurementAliasList);
+    return statement;
+  }
+
   private static Statement invokeParser(String sql, ZoneId zoneId) {
     ASTVisitor astVisitor = new ASTVisitor();
     astVisitor.setZoneId(zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index befcdb71f4..97dfd8344e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -281,6 +283,20 @@ public class LogicalPlanner {
           createAlignedTimeSeriesStatement.getAttributesList());
     }
 
+    @Override
+    public PlanNode visitCreateMultiTimeseries(
+        CreateMultiTimeSeriesStatement createAlignedTimeSeriesStatement, MPPQueryContext context) {
+      return new CreateMultiTimeSeriesNode(
+          context.getQueryId().genPlanNodeId(),
+          createAlignedTimeSeriesStatement.getPaths(),
+          createAlignedTimeSeriesStatement.getDataTypes(),
+          createAlignedTimeSeriesStatement.getEncodings(),
+          createAlignedTimeSeriesStatement.getCompressors(),
+          createAlignedTimeSeriesStatement.getAliasList(),
+          createAlignedTimeSeriesStatement.getTagsList(),
+          createAlignedTimeSeriesStatement.getAttributesList());
+    }
+
     @Override
     public PlanNode visitAlterTimeseries(
         AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 5662d0ca54..339445a86e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -98,7 +99,8 @@ public enum PlanNodeType {
   ALIGNED_SERIES_AGGREGATE_SCAN((short) 34),
   DEVICE_MERGE((short) 35),
   SCHEMA_FETCH_MERGE((short) 36),
-  TRANSFORM((short) 37);
+  TRANSFORM((short) 37),
+  CREATE_MULTI_TIME_SERIES((short) 38);
 
   private final short nodeType;
 
@@ -198,6 +200,8 @@ public enum PlanNodeType {
         return SchemaFetchMergeNode.deserialize(buffer);
       case 37:
         return TransformNode.deserialize(buffer);
+      case 38:
+        return CreateMultiTimeSeriesNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8a3d3ec5be..c681a60a51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
@@ -184,6 +185,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitCreateMultiTimeSeries(CreateMultiTimeSeriesNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
new file mode 100644
index 0000000000..8ba1167b76
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateMultiTimeSeriesNode extends WritePlanNode {
+  private List<PartialPath> paths = new ArrayList<>();;
+  private List<TSDataType> dataTypes = new ArrayList<>();
+  private List<TSEncoding> encodings = new ArrayList<>();
+  private List<CompressionType> compressors = new ArrayList<>();
+  private List<String> aliasList;
+  private List<Map<String, String>> tagsList;
+  private List<Map<String, String>> attributesList;
+  private List<Long> tagOffsets;
+  private TRegionReplicaSet regionReplicaSet;
+
+  public CreateMultiTimeSeriesNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public CreateMultiTimeSeriesNode(
+      PlanNodeId id,
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<String> aliasList,
+      List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList) {
+    super(id);
+    this.paths = paths;
+    this.dataTypes = dataTypes;
+    this.encodings = encodings;
+    this.compressors = compressors;
+    this.aliasList = aliasList;
+    this.tagsList = tagsList;
+    this.attributesList = attributesList;
+  }
+
+  public List<PartialPath> getPaths() {
+    return paths;
+  }
+
+  public void setPaths(List<PartialPath> paths) {
+    this.paths = paths;
+  }
+
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public List<TSEncoding> getEncodings() {
+    return encodings;
+  }
+
+  public void setEncodings(List<TSEncoding> encodings) {
+    this.encodings = encodings;
+  }
+
+  public List<CompressionType> getCompressors() {
+    return compressors;
+  }
+
+  public void setCompressors(List<CompressionType> compressors) {
+    this.compressors = compressors;
+  }
+
+  public List<String> getAliasList() {
+    return aliasList;
+  }
+
+  public void setAliasList(List<String> aliasList) {
+    this.aliasList = aliasList;
+  }
+
+  public List<Map<String, String>> getTagsList() {
+    return tagsList;
+  }
+
+  public void setTagsList(List<Map<String, String>> tagsList) {
+    this.tagsList = tagsList;
+  }
+
+  public List<Map<String, String>> getAttributesList() {
+    return attributesList;
+  }
+
+  public void setAttributesList(List<Map<String, String>> attributesList) {
+    this.attributesList = attributesList;
+  }
+
+  public List<Long> getTagOffsets() {
+    return tagOffsets;
+  }
+
+  public void setTagOffsets(List<Long> tagOffsets) {
+    this.tagOffsets = tagOffsets;
+  }
+
+  public void addTimeSeries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes) {
+    this.paths.add(path);
+    this.dataTypes.add(dataType);
+    this.encodings.add(encoding);
+    this.compressors.add(compressor);
+    if (alias != null) {
+      if (this.aliasList == null) {
+        aliasList = new ArrayList<>();
+      }
+      aliasList.add(alias);
+    }
+    if (tags != null) {
+      if (this.tagsList == null) {
+        tagsList = new ArrayList<>();
+      }
+      tagsList.add(tags);
+    }
+    if (attributes != null) {
+      if (this.attributesList == null) {
+        attributesList = new ArrayList<>();
+      }
+      attributesList.add(attributes);
+    }
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {}
+
+  @Override
+  public PlanNode clone() {
+    throw new NotImplementedException("Clone of CreateMultiTimeSeriesNode is not implemented");
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+    return visitor.visitCreateMultiTimeSeries(this, schemaRegion);
+  }
+
+  public static CreateMultiTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
+    String id;
+    List<PartialPath> paths;
+    List<TSDataType> dataTypes;
+    List<TSEncoding> encodings;
+    List<CompressionType> compressors;
+    List<String> aliasList = null;
+    List<Map<String, String>> tagsList = null;
+    List<Map<String, String>> attributesList = null;
+
+    int size = byteBuffer.getInt();
+    paths = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      try {
+        paths.add(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+      } catch (IllegalPathException e) {
+        throw new IllegalArgumentException("Can not deserialize CreateMultiTimeSeriesNode", e);
+      }
+    }
+
+    dataTypes = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      dataTypes.add(TSDataType.values()[byteBuffer.get()]);
+    }
+
+    encodings = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      encodings.add(TSEncoding.values()[byteBuffer.get()]);
+    }
+
+    compressors = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      compressors.add(CompressionType.values()[byteBuffer.get()]);
+    }
+
+    byte label = byteBuffer.get();
+    if (label >= 0) {
+      aliasList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          aliasList.add(ReadWriteIOUtils.readString(byteBuffer));
+        }
+      }
+    }
+
+    label = byteBuffer.get();
+    if (label >= 0) {
+      tagsList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          tagsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+        }
+      }
+    }
+
+    label = byteBuffer.get();
+    if (label >= 0) {
+      attributesList = new ArrayList<>();
+      if (label == 1) {
+        for (int i = 0; i < size; i++) {
+          attributesList.add(ReadWriteIOUtils.readMap(byteBuffer));
+        }
+      }
+    }
+
+    id = ReadWriteIOUtils.readString(byteBuffer);
+
+    return new CreateMultiTimeSeriesNode(
+        new PlanNodeId(id),
+        paths,
+        dataTypes,
+        encodings,
+        compressors,
+        aliasList,
+        tagsList,
+        attributesList);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CreateMultiTimeSeriesNode that = (CreateMultiTimeSeriesNode) o;
+    return this.getPlanNodeId().equals(that.getPlanNodeId())
+        && Objects.equals(paths, that.paths)
+        && Objects.equals(dataTypes, that.dataTypes)
+        && Objects.equals(encodings, that.encodings)
+        && Objects.equals(compressors, that.compressors)
+        && Objects.equals(tagOffsets, that.tagOffsets)
+        && Objects.equals(aliasList, that.aliasList)
+        && Objects.equals(tagsList, that.tagsList)
+        && Objects.equals(attributesList, that.attributesList);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.CREATE_MULTI_TIME_SERIES.serialize(byteBuffer);
+
+    // paths
+    byteBuffer.putInt(paths.size());
+    for (PartialPath path : paths) {
+      ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+    }
+
+    // dataTypes
+    for (TSDataType dataType : dataTypes) {
+      byteBuffer.put((byte) dataType.ordinal());
+    }
+
+    // encodings
+    for (TSEncoding encoding : encodings) {
+      byteBuffer.put((byte) encoding.ordinal());
+    }
+
+    // compressors
+    for (CompressionType compressor : compressors) {
+      byteBuffer.put((byte) compressor.ordinal());
+    }
+
+    // alias
+    if (aliasList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (aliasList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (String alias : aliasList) {
+        ReadWriteIOUtils.write(alias, byteBuffer);
+      }
+    }
+
+    // tags
+    if (tagsList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (tagsList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (Map<String, String> tags : tagsList) {
+        ReadWriteIOUtils.write(tags, byteBuffer);
+      }
+    }
+
+    // attributes
+    if (attributesList == null) {
+      byteBuffer.put((byte) -1);
+    } else if (attributesList.isEmpty()) {
+      byteBuffer.put((byte) 0);
+    } else {
+      byteBuffer.put((byte) 1);
+      for (Map<String, String> attributes : attributesList) {
+        ReadWriteIOUtils.write(attributes, byteBuffer);
+      }
+    }
+  }
+
+  public int hashCode() {
+    return Objects.hash(
+        this.getPlanNodeId(),
+        paths,
+        dataTypes,
+        encodings,
+        compressors,
+        tagOffsets,
+        aliasList,
+        tagsList,
+        attributesList);
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    Map<TRegionReplicaSet, CreateMultiTimeSeriesNode> splitMap = new HashMap<>();
+    for (int i = 0; i < paths.size(); i++) {
+      TRegionReplicaSet regionReplicaSet =
+          analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(paths.get(i).getDevice());
+      CreateMultiTimeSeriesNode tmpNode;
+      if (splitMap.containsKey(regionReplicaSet)) {
+        tmpNode = splitMap.get(regionReplicaSet);
+      } else {
+        tmpNode = new CreateMultiTimeSeriesNode(this.getPlanNodeId());
+        tmpNode.setRegionReplicaSet(regionReplicaSet);
+        splitMap.put(regionReplicaSet, tmpNode);
+      }
+      tmpNode.addTimeSeries(
+          paths.get(i),
+          dataTypes.get(i),
+          encodings.get(i),
+          compressors.get(i),
+          aliasList == null ? null : aliasList.get(i),
+          attributesList == null ? null : tagsList.get(i),
+          attributesList == null ? null : attributesList.get(i));
+    }
+    return new ArrayList<>(splitMap.values());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 59f16004b3..59d11c3c9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
@@ -77,6 +78,12 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(createAlignedTimeSeriesStatement, context);
   }
 
+  // Create Multi Timeseries
+  public R visitCreateMultiTimeseries(
+      CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, C context) {
+    return visitStatement(createMultiTimeSeriesStatement, context);
+  }
+
   // Alter Timeseries
   public R visitAlterTimeseries(AlterTimeSeriesStatement alterTimeSeriesStatement, C context) {
     return visitStatement(alterTimeSeriesStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
new file mode 100644
index 0000000000..341e4d12a1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** CREATE MULTI TIMESERIES statement. */
+public class CreateMultiTimeSeriesStatement extends Statement {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(CreateMultiTimeSeriesStatement.class);
+
+  private List<PartialPath> paths;
+  private List<TSDataType> dataTypes = new ArrayList<>();
+  private List<TSEncoding> encodings = new ArrayList<>();
+  private List<CompressionType> compressors = new ArrayList<>();
+  private List<String> aliasList = new ArrayList<>();
+  private List<Map<String, String>> tagsList = new ArrayList<>();
+  private List<Map<String, String>> attributesList = new ArrayList<>();
+  private List<Long> tagOffsets = null;
+
+  public CreateMultiTimeSeriesStatement() {
+    super();
+    statementType = StatementType.CREATE_MULTI_TIMESERIES;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return paths;
+  }
+
+  public void setPaths(List<PartialPath> paths) {
+    this.paths = paths;
+  }
+
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public void addDataType(TSDataType dataType) {
+    this.dataTypes.add(dataType);
+  }
+
+  public List<TSEncoding> getEncodings() {
+    return encodings;
+  }
+
+  public void setEncodings(List<TSEncoding> encodings) {
+    this.encodings = encodings;
+  }
+
+  public void addEncoding(TSEncoding encoding) {
+    this.encodings.add(encoding);
+  }
+
+  public List<CompressionType> getCompressors() {
+    return compressors;
+  }
+
+  public void setCompressors(List<CompressionType> compressors) {
+    this.compressors = compressors;
+  }
+
+  public void addCompressor(CompressionType compression) {
+    this.compressors.add(compression);
+  }
+
+  public List<String> getAliasList() {
+    return aliasList;
+  }
+
+  public void setAliasList(List<String> aliasList) {
+    this.aliasList = aliasList;
+  }
+
+  public void addAliasList(String alias) {
+    this.aliasList.add(alias);
+  }
+
+  public List<Map<String, String>> getTagsList() {
+    return tagsList;
+  }
+
+  public void setTagsList(List<Map<String, String>> tagsList) {
+    this.tagsList = tagsList;
+  }
+
+  public void addTagsList(Map<String, String> tags) {
+    this.tagsList.add(tags);
+  }
+
+  public List<Map<String, String>> getAttributesList() {
+    return attributesList;
+  }
+
+  public void setAttributesList(List<Map<String, String>> attributesList) {
+    this.attributesList = attributesList;
+  }
+
+  public void addAttributesList(Map<String, String> attributes) {
+    this.attributesList.add(attributes);
+  }
+
+  public List<Long> getTagOffsets() {
+    if (tagOffsets == null) {
+      tagOffsets = new ArrayList<>();
+      for (int i = 0; i < paths.size(); i++) {
+        tagOffsets.add(Long.parseLong("-1"));
+      }
+    }
+    return tagOffsets;
+  }
+
+  public void setTagOffsets(List<Long> tagOffsets) {
+    this.tagOffsets = tagOffsets;
+  }
+
+  public void addTagOffsets(Long tagsOffset) {
+    this.tagOffsets.add(tagsOffset);
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitCreateMultiTimeseries(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index c9635fced0..f7f863a33c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
@@ -367,7 +368,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
             req.getMeasurements());
       }
 
-      // Step 1: transfer from CreateAlignedTimeSeriesStatement to Statement
+      // Step 1: transfer from CreateAlignedTimeSeriesReq to Statement
       CreateAlignedTimeSeriesStatement statement =
           (CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
 
@@ -397,7 +398,45 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
-    throw new UnsupportedOperationException();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session-{} create {} timeseries, the first is {}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.getPaths().size(),
+            req.getPaths().get(0));
+      }
+
+      // Step 1: transfer from CreateMultiTimeSeriesReq to Statement
+      CreateMultiTimeSeriesStatement statement =
+          (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
   }
 
   @Override