You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/15 05:03:13 UTC
[iotdb] branch master updated: [IOTDB-3172] Support CreateMultTimeseries API for new cluster (#5896)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3857fab2b1 [IOTDB-3172] Support CreateMultTimeseries API for new cluster (#5896)
3857fab2b1 is described below
commit 3857fab2b13f0af282032b8f28cc2035c3f50a27
Author: Haonan <hh...@outlook.com>
AuthorDate: Sun May 15 13:03:07 2022 +0800
[IOTDB-3172] Support CreateMultTimeseries API for new cluster (#5896)
---
.../metadata/visitor/SchemaExecutionVisitor.java | 50 +++
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 15 +
.../db/mpp/plan/parser/StatementGenerator.java | 33 ++
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 17 +
.../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +
.../metedata/write/CreateMultiTimeSeriesNode.java | 453 +++++++++++++++++++++
.../db/mpp/plan/statement/StatementVisitor.java | 7 +
.../metadata/CreateMultiTimeSeriesStatement.java | 143 +++++++
.../thrift/impl/DataNodeTSIServiceImpl.java | 43 +-
.../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java | 88 ++++
.../iotdb/db/service/InternalServiceImplTest.java | 100 +++++
12 files changed, 957 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 9988e8b8da..b6b55c731b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -30,9 +30,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -42,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
/** Schema write PlanNode visitor */
public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
@@ -72,6 +75,39 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
+ @Override
+ public TSStatus visitCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ CreateMultiTimeSeriesPlan multiPlan =
+ (CreateMultiTimeSeriesPlan)
+ node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+ for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+ if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+ continue;
+ }
+ CreateTimeSeriesPlan plan =
+ new CreateTimeSeriesPlan(
+ multiPlan.getPaths().get(i),
+ multiPlan.getDataTypes().get(i),
+ multiPlan.getEncodings().get(i),
+ multiPlan.getCompressors().get(i),
+ multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+ multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+ multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+ multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+ try {
+ schemaRegion.createTimeseries(plan, -1);
+ } catch (MetadataException e) {
+ logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!multiPlan.getResults().isEmpty()) {
+ return RpcUtils.getStatus(Arrays.asList(multiPlan.getFailingStatus()));
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+ }
+
@Override
public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
@@ -146,6 +182,20 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
node.getTagsList(),
node.getAttributesList());
}
+
+ public PhysicalPlan visitCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node, TransformerContext context) {
+ CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
+ multiPlan.setPaths(node.getPaths());
+ multiPlan.setDataTypes(node.getDataTypes());
+ multiPlan.setEncodings(node.getEncodings());
+ multiPlan.setCompressors(node.getCompressors());
+ multiPlan.setProps(node.getPropsList());
+ multiPlan.setAlias(node.getAliasList());
+ multiPlan.setTags(node.getTagsList());
+ multiPlan.setAttributes(node.getAttributesList());
+ return multiPlan;
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 9522387207..fd6053fd20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -854,6 +855,20 @@ public class Analyzer {
return analysis;
}
+ @Override
+ public Analysis visitCreateMultiTimeseries(
+ CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(createMultiTimeSeriesStatement);
+
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.getOrCreateSchemaPartition(
+ new PathPatternTree(createMultiTimeSeriesStatement.getPaths()));
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ return analysis;
+ }
+
@Override
public Analysis visitAlterTimeseries(
AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 51726de6ca..fdd5f36f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.qp.strategy.SQLParseError;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -373,6 +375,37 @@ public class StatementGenerator {
return statement;
}
+ public static Statement createStatement(TSCreateMultiTimeseriesReq req)
+ throws IllegalPathException {
+ // construct create multi timeseries statement
+ List<PartialPath> paths = new ArrayList<>();
+ for (String path : req.paths) {
+ paths.add(new PartialPath(path));
+ }
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (int dataType : req.dataTypes) {
+ dataTypes.add(TSDataType.values()[dataType]);
+ }
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int encoding : req.encodings) {
+ encodings.add(TSEncoding.values()[encoding]);
+ }
+ List<CompressionType> compressors = new ArrayList<>();
+ for (int compressor : req.compressors) {
+ compressors.add(CompressionType.values()[compressor]);
+ }
+ CreateMultiTimeSeriesStatement statement = new CreateMultiTimeSeriesStatement();
+ statement.setPaths(paths);
+ statement.setDataTypes(dataTypes);
+ statement.setEncodings(encodings);
+ statement.setCompressors(compressors);
+ statement.setPropsList(req.propsList);
+ statement.setTagsList(req.tagsList);
+ statement.setAttributesList(req.attributesList);
+ statement.setAliasList(req.measurementAliasList);
+ return statement;
+ }
+
private static Statement invokeParser(String sql, ZoneId zoneId) {
ASTVisitor astVisitor = new ASTVisitor();
astVisitor.setZoneId(zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index befcdb71f4..57d0854b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -281,6 +283,21 @@ public class LogicalPlanner {
createAlignedTimeSeriesStatement.getAttributesList());
}
+ @Override
+ public PlanNode visitCreateMultiTimeseries(
+ CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
+ return new CreateMultiTimeSeriesNode(
+ context.getQueryId().genPlanNodeId(),
+ createMultiTimeSeriesStatement.getPaths(),
+ createMultiTimeSeriesStatement.getDataTypes(),
+ createMultiTimeSeriesStatement.getEncodings(),
+ createMultiTimeSeriesStatement.getCompressors(),
+ createMultiTimeSeriesStatement.getPropsList(),
+ createMultiTimeSeriesStatement.getAliasList(),
+ createMultiTimeSeriesStatement.getTagsList(),
+ createMultiTimeSeriesStatement.getAttributesList());
+ }
+
@Override
public PlanNode visitAlterTimeseries(
AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 1d52cd0231..4fe62bad82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -99,7 +100,8 @@ public enum PlanNodeType {
DEVICE_MERGE((short) 35),
SCHEMA_FETCH_MERGE((short) 36),
TRANSFORM((short) 37),
- DELETE_REGION((short) 38);
+ DELETE_REGION((short) 38),
+ CREATE_MULTI_TIME_SERIES((short) 39);
private final short nodeType;
@@ -201,6 +203,8 @@ public enum PlanNodeType {
return TransformNode.deserialize(buffer);
case 38:
return DeleteRegionNode.deserialize(buffer);
+ case 39:
+ return CreateMultiTimeSeriesNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 959cfa3a62..b11dac5a26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
@@ -184,6 +185,10 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitCreateMultiTimeSeries(CreateMultiTimeSeriesNode node, C context) {
+ return visitPlan(node, context);
+ }
+
public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
new file mode 100644
index 0000000000..52fa45a411
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateMultiTimeSeriesNode extends WritePlanNode {
+ private List<PartialPath> paths = new ArrayList<>();
+ private List<TSDataType> dataTypes = new ArrayList<>();
+ private List<TSEncoding> encodings = new ArrayList<>();
+ private List<CompressionType> compressors = new ArrayList<>();
+ private List<String> aliasList;
+ private List<Map<String, String>> propsList;
+ private List<Map<String, String>> tagsList;
+ private List<Map<String, String>> attributesList;
+ private List<Long> tagOffsets;
+ private TRegionReplicaSet regionReplicaSet;
+
+ public CreateMultiTimeSeriesNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public CreateMultiTimeSeriesNode(
+ PlanNodeId id,
+ List<PartialPath> paths,
+ List<TSDataType> dataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors,
+ List<Map<String, String>> propsList,
+ List<String> aliasList,
+ List<Map<String, String>> tagsList,
+ List<Map<String, String>> attributesList) {
+ super(id);
+ this.paths = paths;
+ this.dataTypes = dataTypes;
+ this.encodings = encodings;
+ this.compressors = compressors;
+ this.propsList = propsList;
+ this.aliasList = aliasList;
+ this.tagsList = tagsList;
+ this.attributesList = attributesList;
+ }
+
+ public List<PartialPath> getPaths() {
+ return paths;
+ }
+
+ public void setPaths(List<PartialPath> paths) {
+ this.paths = paths;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public List<TSEncoding> getEncodings() {
+ return encodings;
+ }
+
+ public void setEncodings(List<TSEncoding> encodings) {
+ this.encodings = encodings;
+ }
+
+ public List<CompressionType> getCompressors() {
+ return compressors;
+ }
+
+ public void setCompressors(List<CompressionType> compressors) {
+ this.compressors = compressors;
+ }
+
+ public List<Map<String, String>> getPropsList() {
+ return propsList;
+ }
+
+ public void setPropsList(List<Map<String, String>> propsList) {
+ this.propsList = propsList;
+ }
+
+ public List<String> getAliasList() {
+ return aliasList;
+ }
+
+ public void setAliasList(List<String> aliasList) {
+ this.aliasList = aliasList;
+ }
+
+ public List<Map<String, String>> getTagsList() {
+ return tagsList;
+ }
+
+ public void setTagsList(List<Map<String, String>> tagsList) {
+ this.tagsList = tagsList;
+ }
+
+ public List<Map<String, String>> getAttributesList() {
+ return attributesList;
+ }
+
+ public void setAttributesList(List<Map<String, String>> attributesList) {
+ this.attributesList = attributesList;
+ }
+
+ public List<Long> getTagOffsets() {
+ return tagOffsets;
+ }
+
+ public void setTagOffsets(List<Long> tagOffsets) {
+ this.tagOffsets = tagOffsets;
+ }
+
+ public void addTimeSeries(
+ PartialPath path,
+ TSDataType dataType,
+ TSEncoding encoding,
+ CompressionType compressor,
+ Map<String, String> props,
+ String alias,
+ Map<String, String> tags,
+ Map<String, String> attributes) {
+ this.paths.add(path);
+ this.dataTypes.add(dataType);
+ this.encodings.add(encoding);
+ this.compressors.add(compressor);
+ if (props != null) {
+ if (this.propsList == null) {
+ propsList = new ArrayList<>();
+ }
+ propsList.add(props);
+ }
+ if (alias != null) {
+ if (this.aliasList == null) {
+ aliasList = new ArrayList<>();
+ }
+ aliasList.add(alias);
+ }
+ if (tags != null) {
+ if (this.tagsList == null) {
+ tagsList = new ArrayList<>();
+ }
+ tagsList.add(tags);
+ }
+ if (attributes != null) {
+ if (this.attributesList == null) {
+ attributesList = new ArrayList<>();
+ }
+ attributesList.add(attributes);
+ }
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
+ @Override
+ public PlanNode clone() {
+ throw new NotImplementedException("Clone of CreateMultiTimeSeriesNode is not implemented");
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+ return visitor.visitCreateMultiTimeSeries(this, schemaRegion);
+ }
+
+ public static CreateMultiTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
+ String id;
+ List<PartialPath> paths;
+ List<TSDataType> dataTypes;
+ List<TSEncoding> encodings;
+ List<CompressionType> compressors;
+ List<String> aliasList = null;
+ List<Map<String, String>> propsList = null;
+ List<Map<String, String>> tagsList = null;
+ List<Map<String, String>> attributesList = null;
+
+ int size = byteBuffer.getInt();
+ paths = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ try {
+ paths.add(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Can not deserialize CreateMultiTimeSeriesNode", e);
+ }
+ }
+
+ dataTypes = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ dataTypes.add(TSDataType.values()[byteBuffer.get()]);
+ }
+
+ encodings = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ encodings.add(TSEncoding.values()[byteBuffer.get()]);
+ }
+
+ compressors = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ compressors.add(CompressionType.values()[byteBuffer.get()]);
+ }
+
+ byte label = byteBuffer.get();
+ if (label >= 0) {
+ aliasList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ aliasList.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ }
+ }
+
+ label = byteBuffer.get();
+ if (label >= 0) {
+ propsList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ propsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+ }
+ }
+ }
+
+ label = byteBuffer.get();
+ if (label >= 0) {
+ tagsList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ tagsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+ }
+ }
+ }
+
+ label = byteBuffer.get();
+ if (label >= 0) {
+ attributesList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ attributesList.add(ReadWriteIOUtils.readMap(byteBuffer));
+ }
+ }
+ }
+
+ id = ReadWriteIOUtils.readString(byteBuffer);
+
+ return new CreateMultiTimeSeriesNode(
+ new PlanNodeId(id),
+ paths,
+ dataTypes,
+ encodings,
+ compressors,
+ propsList,
+ aliasList,
+ tagsList,
+ attributesList);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CreateMultiTimeSeriesNode that = (CreateMultiTimeSeriesNode) o;
+ return this.getPlanNodeId().equals(that.getPlanNodeId())
+ && Objects.equals(paths, that.paths)
+ && Objects.equals(dataTypes, that.dataTypes)
+ && Objects.equals(encodings, that.encodings)
+ && Objects.equals(compressors, that.compressors)
+ && Objects.equals(propsList, that.propsList)
+ && Objects.equals(tagOffsets, that.tagOffsets)
+ && Objects.equals(aliasList, that.aliasList)
+ && Objects.equals(tagsList, that.tagsList)
+ && Objects.equals(attributesList, that.attributesList);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.CREATE_MULTI_TIME_SERIES.serialize(byteBuffer);
+
+ // paths
+ byteBuffer.putInt(paths.size());
+ for (PartialPath path : paths) {
+ ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+ }
+
+ // dataTypes
+ for (TSDataType dataType : dataTypes) {
+ byteBuffer.put((byte) dataType.ordinal());
+ }
+
+ // encodings
+ for (TSEncoding encoding : encodings) {
+ byteBuffer.put((byte) encoding.ordinal());
+ }
+
+ // compressors
+ for (CompressionType compressor : compressors) {
+ byteBuffer.put((byte) compressor.ordinal());
+ }
+
+ // alias
+ if (aliasList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (aliasList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (String alias : aliasList) {
+ ReadWriteIOUtils.write(alias, byteBuffer);
+ }
+ }
+
+ // props
+ if (propsList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (propsList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (Map<String, String> props : propsList) {
+ ReadWriteIOUtils.write(props, byteBuffer);
+ }
+ }
+
+ // tags
+ if (tagsList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (tagsList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (Map<String, String> tags : tagsList) {
+ ReadWriteIOUtils.write(tags, byteBuffer);
+ }
+ }
+
+ // attributes
+ if (attributesList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (attributesList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (Map<String, String> attributes : attributesList) {
+ ReadWriteIOUtils.write(attributes, byteBuffer);
+ }
+ }
+ }
+
+ public int hashCode() {
+ return Objects.hash(
+ this.getPlanNodeId(),
+ paths,
+ dataTypes,
+ encodings,
+ compressors,
+ tagOffsets,
+ aliasList,
+ tagsList,
+ attributesList);
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ Map<TRegionReplicaSet, CreateMultiTimeSeriesNode> splitMap = new HashMap<>();
+ for (int i = 0; i < paths.size(); i++) {
+ TRegionReplicaSet regionReplicaSet =
+ analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(paths.get(i).getDevice());
+ CreateMultiTimeSeriesNode tmpNode;
+ if (splitMap.containsKey(regionReplicaSet)) {
+ tmpNode = splitMap.get(regionReplicaSet);
+ } else {
+ tmpNode = new CreateMultiTimeSeriesNode(this.getPlanNodeId());
+ tmpNode.setRegionReplicaSet(regionReplicaSet);
+ splitMap.put(regionReplicaSet, tmpNode);
+ }
+ tmpNode.addTimeSeries(
+ paths.get(i),
+ dataTypes.get(i),
+ encodings.get(i),
+ compressors.get(i),
+ propsList == null ? null : propsList.get(i),
+ aliasList == null ? null : aliasList.get(i),
+ attributesList == null ? null : tagsList.get(i),
+ attributesList == null ? null : attributesList.get(i));
+ }
+ return new ArrayList<>(splitMap.values());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 77bd11ebd6..16bbdff8dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
@@ -78,6 +79,12 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(createAlignedTimeSeriesStatement, context);
}
+ // Create Multi Timeseries
+ public R visitCreateMultiTimeseries(
+ CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, C context) {
+ return visitStatement(createMultiTimeSeriesStatement, context);
+ }
+
// Alter Timeseries
public R visitAlterTimeseries(AlterTimeSeriesStatement alterTimeSeriesStatement, C context) {
return visitStatement(alterTimeSeriesStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
new file mode 100644
index 0000000000..d46dbc4214
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** CREATE MULTI TIMESERIES statement. */
+public class CreateMultiTimeSeriesStatement extends Statement {
+
+ private List<PartialPath> paths;
+ private List<TSDataType> dataTypes = new ArrayList<>();
+ private List<TSEncoding> encodings = new ArrayList<>();
+ private List<CompressionType> compressors = new ArrayList<>();
+ private List<Map<String, String>> propsList;
+ private List<String> aliasList;
+ private List<Map<String, String>> tagsList;
+ private List<Map<String, String>> attributesList;
+ private List<Long> tagOffsets;
+
+ public CreateMultiTimeSeriesStatement() {
+ super();
+ statementType = StatementType.CREATE_MULTI_TIMESERIES;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return paths;
+ }
+
+ public void setPaths(List<PartialPath> paths) {
+ this.paths = paths;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public List<TSEncoding> getEncodings() {
+ return encodings;
+ }
+
+ public void setEncodings(List<TSEncoding> encodings) {
+ this.encodings = encodings;
+ }
+
+ public List<CompressionType> getCompressors() {
+ return compressors;
+ }
+
+ public void setCompressors(List<CompressionType> compressors) {
+ this.compressors = compressors;
+ }
+
+ public List<Map<String, String>> getPropsList() {
+ return propsList;
+ }
+
+ public void setPropsList(List<Map<String, String>> propsList) {
+ this.propsList = propsList;
+ }
+
+ public List<String> getAliasList() {
+ return aliasList;
+ }
+
+ public void setAliasList(List<String> aliasList) {
+ this.aliasList = aliasList;
+ }
+
+ public List<Map<String, String>> getTagsList() {
+ return tagsList;
+ }
+
+ public void setTagsList(List<Map<String, String>> tagsList) {
+ this.tagsList = tagsList;
+ }
+
+ public List<Map<String, String>> getAttributesList() {
+ return attributesList;
+ }
+
+ public void setAttributesList(List<Map<String, String>> attributesList) {
+ this.attributesList = attributesList;
+ }
+
+ public void addAttributesList(Map<String, String> attributes) {
+ this.attributesList.add(attributes);
+ }
+
+ public List<Long> getTagOffsets() {
+ if (tagOffsets == null) {
+ tagOffsets = new ArrayList<>();
+ for (int i = 0; i < paths.size(); i++) {
+ tagOffsets.add(Long.parseLong("-1"));
+ }
+ }
+ return tagOffsets;
+ }
+
+ public void setTagOffsets(List<Long> tagOffsets) {
+ this.tagOffsets = tagOffsets;
+ }
+
+ public void addTagOffsets(Long tagsOffset) {
+ this.tagOffsets.add(tagsOffset);
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCreateMultiTimeseries(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index c9635fced0..f7f863a33c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.query.control.SessionManager;
@@ -367,7 +368,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
req.getMeasurements());
}
- // Step 1: transfer from CreateAlignedTimeSeriesStatement to Statement
+ // Step 1: transfer from CreateAlignedTimeSeriesReq to Statement
CreateAlignedTimeSeriesStatement statement =
(CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
@@ -397,7 +398,45 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
@Override
public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
- throw new UnsupportedOperationException();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session-{} create {} timeseries, the first is {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPaths().size(),
+ req.getPaths().get(0));
+ }
+
+ // Step 1: transfer from CreateMultiTimeSeriesReq to Statement
+ CreateMultiTimeSeriesStatement statement =
+ (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index dec4924081..a0b6261afb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -37,11 +37,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryM
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -187,6 +190,91 @@ public class LogicalPlannerTest {
}
}
+ @Test
+ public void testCreateMultiTimeSeriesPlan() {
+ try {
+ TSCreateMultiTimeseriesReq req = new TSCreateMultiTimeseriesReq();
+ req.setPaths(
+ new ArrayList<String>() {
+ {
+ add("root.sg1.d2.s1");
+ add("root.sg1.d2.s2");
+ }
+ });
+ req.setMeasurementAliasList(
+ new ArrayList<String>() {
+ {
+ add("meter1");
+ add(null);
+ }
+ });
+ req.setDataTypes(
+ new ArrayList<Integer>() {
+ {
+ add(TSDataType.FLOAT.ordinal());
+ add(TSDataType.FLOAT.ordinal());
+ }
+ });
+ req.setEncodings(
+ new ArrayList<Integer>() {
+ {
+ add(TSEncoding.PLAIN.ordinal());
+ add(TSEncoding.PLAIN.ordinal());
+ }
+ });
+ req.setCompressors(
+ new ArrayList<Integer>() {
+ {
+ add(CompressionType.SNAPPY.ordinal());
+ add(CompressionType.SNAPPY.ordinal());
+ }
+ });
+ req.setAttributesList(
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String, String>() {
+ {
+ put("attr1", "a1");
+ }
+ });
+ add(null);
+ }
+ });
+ req.setTagsList(
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String, String>() {
+ {
+ put("tag1", "t1");
+ }
+ });
+ add(null);
+ }
+ });
+ CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement =
+ (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
+ Analysis analysis = analyzer.analyze(createMultiTimeSeriesStatement);
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ CreateMultiTimeSeriesNode createMultiTimeSeriesNode =
+ (CreateMultiTimeSeriesNode) planner.plan(analysis).getRootNode();
+ // Test serialize and deserialize
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
+ createMultiTimeSeriesNode.serialize(byteBuffer);
+ byteBuffer.flip();
+ CreateMultiTimeSeriesNode createMultiTimeSeriesNode1 =
+ (CreateMultiTimeSeriesNode) PlanNodeDeserializeHelper.deserialize(byteBuffer);
+ Assert.assertEquals(createMultiTimeSeriesNode, createMultiTimeSeriesNode1);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
@Test
public void testAlterTimeseriesPlan() {
String sql = "ALTER timeseries root.turbine.d1.s1 RENAME 'tag1' TO 'newTag1'";
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index b20fa5364d..3fafea7cfd 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -252,6 +253,105 @@ public class InternalServiceImplTest {
Assert.assertTrue(response.accepted);
}
+ @Test
+ public void testCreateMultiTimeSeries() throws MetadataException {
+ CreateMultiTimeSeriesNode createMultiTimeSeriesNode =
+ new CreateMultiTimeSeriesNode(
+ new PlanNodeId("0"),
+ new ArrayList<PartialPath>() {
+ {
+ add(new PartialPath("root.ln.d3.s1"));
+ add(new PartialPath("root.ln.d3.s2"));
+ }
+ },
+ new ArrayList<TSDataType>() {
+ {
+ add(TSDataType.FLOAT);
+ add(TSDataType.FLOAT);
+ }
+ },
+ new ArrayList<TSEncoding>() {
+ {
+ add(TSEncoding.PLAIN);
+ add(TSEncoding.PLAIN);
+ }
+ },
+ new ArrayList<CompressionType>() {
+ {
+ add(CompressionType.SNAPPY);
+ add(CompressionType.SNAPPY);
+ }
+ },
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String, String>() {
+ {
+ put("MAX_POINT_NUMBER", "3");
+ }
+ });
+ add(null);
+ }
+ },
+ new ArrayList<String>() {
+ {
+ add("meter1");
+ add(null);
+ }
+ },
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String, String>() {
+ {
+ put("tag1", "t1");
+ }
+ });
+ add(null);
+ }
+ },
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String, String>() {
+ {
+ put("tag1", "t1");
+ }
+ });
+ add(null);
+ }
+ });
+
+ TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+ PlanFragment planFragment =
+ new PlanFragment(new PlanFragmentId("2", 3), createMultiTimeSeriesNode);
+ FragmentInstance fragmentInstance =
+ new FragmentInstance(
+ planFragment,
+ planFragment.getId().genFragmentInstanceId(),
+ new GroupByFilter(1, 2, 3, 4),
+ QueryType.WRITE);
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+
+ // serialize fragmentInstance
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentInstance.serializeRequest(byteBuffer);
+ byteBuffer.flip();
+
+ // put serialized fragmentInstance to TSendFragmentInstanceReq
+ TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+ TFragmentInstance tFragmentInstance = new TFragmentInstance();
+ tFragmentInstance.setBody(byteBuffer);
+ request.setFragmentInstance(tFragmentInstance);
+ request.setConsensusGroupId(regionReplicaSet.getRegionId());
+ request.setQueryType(QueryType.WRITE.toString());
+
+ // Use consensus layer to execute request
+ TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+
+ Assert.assertTrue(response.accepted);
+ }
+
private TRegionReplicaSet genRegionReplicaSet() {
List<TDataNodeLocation> dataNodeList = new ArrayList<>();
dataNodeList.add(