You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/13 07:55:17 UTC
[iotdb] 01/01: Support CreateMultiTimeseries in new cluster
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch multitimeseries
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79ef4a6e4ae5c4e8a7de82cc5b6a1c2318958510
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri May 13 15:54:54 2022 +0800
Support CreateMultiTimeseries in new cluster
---
.../metadata/visitor/SchemaExecutionVisitor.java | 50 +++
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 15 +
.../db/mpp/plan/parser/StatementGenerator.java | 32 ++
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 16 +
.../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +
.../metedata/write/CreateMultiTimeSeriesNode.java | 409 +++++++++++++++++++++
.../db/mpp/plan/statement/StatementVisitor.java | 7 +
.../metadata/CreateMultiTimeSeriesStatement.java | 160 ++++++++
.../thrift/impl/DataNodeTSIServiceImpl.java | 43 ++-
10 files changed, 740 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 657f737b3e..6561eb6dee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -27,9 +27,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
/** Schema write PlanNode visitor */
public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
@@ -69,6 +72,39 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
+ @Override
+ public TSStatus visitCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ CreateMultiTimeSeriesPlan multiPlan =
+ (CreateMultiTimeSeriesPlan)
+ node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+ for (int i = 0; i < multiPlan.getPaths().size(); i++) {
+ if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
+ continue;
+ }
+ CreateTimeSeriesPlan plan =
+ new CreateTimeSeriesPlan(
+ multiPlan.getPaths().get(i),
+ multiPlan.getDataTypes().get(i),
+ multiPlan.getEncodings().get(i),
+ multiPlan.getCompressors().get(i),
+ multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+ multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+ multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+ multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+ try {
+ schemaRegion.createTimeseries(plan, -1);
+ } catch (MetadataException e) {
+ logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!multiPlan.getResults().isEmpty()) {
+ return RpcUtils.getStatus(Arrays.asList(multiPlan.getFailingStatus()));
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+ }
+
@Override
public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
@@ -143,6 +179,20 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
node.getTagsList(),
node.getAttributesList());
}
+
+ public PhysicalPlan visitCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node, TransformerContext context) {
+
+ CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
+ multiPlan.setPaths(node.getPaths());
+ multiPlan.setDataTypes(node.getDataTypes());
+ multiPlan.setEncodings(node.getEncodings());
+ multiPlan.setCompressors(node.getCompressors());
+ multiPlan.setAlias(node.getAliasList());
+ multiPlan.setTags(node.getTagsList());
+ multiPlan.setAttributes(node.getAttributesList());
+ return multiPlan;
+ }
}
private static class TransformerContext {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index f52ba6fd77..5c698d1ff0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -854,6 +855,20 @@ public class Analyzer {
return analysis;
}
+ @Override
+ public Analysis visitCreateMultiTimeseries(
+ CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(createMultiTimeSeriesStatement);
+
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.getOrCreateSchemaPartition(
+ new PathPatternTree(createMultiTimeSeriesStatement.getPaths()));
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ return analysis;
+ }
+
@Override
public Analysis visitAlterTimeseries(
AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 51726de6ca..a473299bb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.qp.strategy.SQLParseError;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -373,6 +375,36 @@ public class StatementGenerator {
return statement;
}
+ public static Statement createStatement(TSCreateMultiTimeseriesReq req)
+ throws IllegalPathException {
+ // construct create multi timeseries statement
+ CreateMultiTimeSeriesStatement statement = new CreateMultiTimeSeriesStatement();
+ List<PartialPath> paths = new ArrayList<>();
+ for (String path : req.paths) {
+ paths.add(new PartialPath(path));
+ }
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (int dataType : req.dataTypes) {
+ dataTypes.add(TSDataType.values()[dataType]);
+ }
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int encoding : req.encodings) {
+ encodings.add(TSEncoding.values()[encoding]);
+ }
+ List<CompressionType> compressors = new ArrayList<>();
+ for (int compressor : req.compressors) {
+ compressors.add(CompressionType.values()[compressor]);
+ }
+ statement.setPaths(paths);
+ statement.setDataTypes(dataTypes);
+ statement.setEncodings(encodings);
+ statement.setCompressors(compressors);
+ statement.setTagsList(req.tagsList);
+ statement.setAttributesList(req.attributesList);
+ statement.setAliasList(req.measurementAliasList);
+ return statement;
+ }
+
private static Statement invokeParser(String sql, ZoneId zoneId) {
ASTVisitor astVisitor = new ASTVisitor();
astVisitor.setZoneId(zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index befcdb71f4..97dfd8344e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -281,6 +283,20 @@ public class LogicalPlanner {
createAlignedTimeSeriesStatement.getAttributesList());
}
+ @Override
+ public PlanNode visitCreateMultiTimeseries(
+ CreateMultiTimeSeriesStatement createAlignedTimeSeriesStatement, MPPQueryContext context) {
+ return new CreateMultiTimeSeriesNode(
+ context.getQueryId().genPlanNodeId(),
+ createAlignedTimeSeriesStatement.getPaths(),
+ createAlignedTimeSeriesStatement.getDataTypes(),
+ createAlignedTimeSeriesStatement.getEncodings(),
+ createAlignedTimeSeriesStatement.getCompressors(),
+ createAlignedTimeSeriesStatement.getAliasList(),
+ createAlignedTimeSeriesStatement.getTagsList(),
+ createAlignedTimeSeriesStatement.getAttributesList());
+ }
+
@Override
public PlanNode visitAlterTimeseries(
AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 5662d0ca54..339445a86e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -98,7 +99,8 @@ public enum PlanNodeType {
ALIGNED_SERIES_AGGREGATE_SCAN((short) 34),
DEVICE_MERGE((short) 35),
SCHEMA_FETCH_MERGE((short) 36),
- TRANSFORM((short) 37);
+ TRANSFORM((short) 37),
+ CREATE_MULTI_TIME_SERIES((short) 38);
private final short nodeType;
@@ -198,6 +200,8 @@ public enum PlanNodeType {
return SchemaFetchMergeNode.deserialize(buffer);
case 37:
return TransformNode.deserialize(buffer);
+ case 38:
+ return CreateMultiTimeSeriesNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8a3d3ec5be..c681a60a51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
@@ -184,6 +185,10 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitCreateMultiTimeSeries(CreateMultiTimeSeriesNode node, C context) {
+ return visitPlan(node, context);
+ }
+
public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
new file mode 100644
index 0000000000..8ba1167b76
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateMultiTimeSeriesNode extends WritePlanNode {
+ private List<PartialPath> paths = new ArrayList<>();;
+ private List<TSDataType> dataTypes = new ArrayList<>();
+ private List<TSEncoding> encodings = new ArrayList<>();
+ private List<CompressionType> compressors = new ArrayList<>();
+ private List<String> aliasList;
+ private List<Map<String, String>> tagsList;
+ private List<Map<String, String>> attributesList;
+ private List<Long> tagOffsets;
+ private TRegionReplicaSet regionReplicaSet;
+
+ public CreateMultiTimeSeriesNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public CreateMultiTimeSeriesNode(
+ PlanNodeId id,
+ List<PartialPath> paths,
+ List<TSDataType> dataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors,
+ List<String> aliasList,
+ List<Map<String, String>> tagsList,
+ List<Map<String, String>> attributesList) {
+ super(id);
+ this.paths = paths;
+ this.dataTypes = dataTypes;
+ this.encodings = encodings;
+ this.compressors = compressors;
+ this.aliasList = aliasList;
+ this.tagsList = tagsList;
+ this.attributesList = attributesList;
+ }
+
+ public List<PartialPath> getPaths() {
+ return paths;
+ }
+
+ public void setPaths(List<PartialPath> paths) {
+ this.paths = paths;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public List<TSEncoding> getEncodings() {
+ return encodings;
+ }
+
+ public void setEncodings(List<TSEncoding> encodings) {
+ this.encodings = encodings;
+ }
+
+ public List<CompressionType> getCompressors() {
+ return compressors;
+ }
+
+ public void setCompressors(List<CompressionType> compressors) {
+ this.compressors = compressors;
+ }
+
+ public List<String> getAliasList() {
+ return aliasList;
+ }
+
+ public void setAliasList(List<String> aliasList) {
+ this.aliasList = aliasList;
+ }
+
+ public List<Map<String, String>> getTagsList() {
+ return tagsList;
+ }
+
+ public void setTagsList(List<Map<String, String>> tagsList) {
+ this.tagsList = tagsList;
+ }
+
+ public List<Map<String, String>> getAttributesList() {
+ return attributesList;
+ }
+
+ public void setAttributesList(List<Map<String, String>> attributesList) {
+ this.attributesList = attributesList;
+ }
+
+ public List<Long> getTagOffsets() {
+ return tagOffsets;
+ }
+
+ public void setTagOffsets(List<Long> tagOffsets) {
+ this.tagOffsets = tagOffsets;
+ }
+
+ public void addTimeSeries(
+ PartialPath path,
+ TSDataType dataType,
+ TSEncoding encoding,
+ CompressionType compressor,
+ String alias,
+ Map<String, String> tags,
+ Map<String, String> attributes) {
+ this.paths.add(path);
+ this.dataTypes.add(dataType);
+ this.encodings.add(encoding);
+ this.compressors.add(compressor);
+ if (alias != null) {
+ if (this.aliasList == null) {
+ aliasList = new ArrayList<>();
+ }
+ aliasList.add(alias);
+ }
+ if (tags != null) {
+ if (this.tagsList == null) {
+ tagsList = new ArrayList<>();
+ }
+ tagsList.add(tags);
+ }
+ if (attributes != null) {
+ if (this.attributesList == null) {
+ attributesList = new ArrayList<>();
+ }
+ attributesList.add(attributes);
+ }
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
+ @Override
+ public PlanNode clone() {
+ throw new NotImplementedException("Clone of CreateMultiTimeSeriesNode is not implemented");
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+ return visitor.visitCreateMultiTimeSeries(this, schemaRegion);
+ }
+
+ public static CreateMultiTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
+ String id;
+ List<PartialPath> paths;
+ List<TSDataType> dataTypes;
+ List<TSEncoding> encodings;
+ List<CompressionType> compressors;
+ List<String> aliasList = null;
+ List<Map<String, String>> tagsList = null;
+ List<Map<String, String>> attributesList = null;
+
+ int size = byteBuffer.getInt();
+ paths = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ try {
+ paths.add(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Can not deserialize CreateMultiTimeSeriesNode", e);
+ }
+ }
+
+ dataTypes = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ dataTypes.add(TSDataType.values()[byteBuffer.get()]);
+ }
+
+ encodings = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ encodings.add(TSEncoding.values()[byteBuffer.get()]);
+ }
+
+ compressors = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ compressors.add(CompressionType.values()[byteBuffer.get()]);
+ }
+
+ byte label = byteBuffer.get();
+ if (label >= 0) {
+ aliasList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ aliasList.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+ }
+ }
+
+ label = byteBuffer.get();
+ if (label >= 0) {
+ tagsList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ tagsList.add(ReadWriteIOUtils.readMap(byteBuffer));
+ }
+ }
+ }
+
+ label = byteBuffer.get();
+ if (label >= 0) {
+ attributesList = new ArrayList<>();
+ if (label == 1) {
+ for (int i = 0; i < size; i++) {
+ attributesList.add(ReadWriteIOUtils.readMap(byteBuffer));
+ }
+ }
+ }
+
+ id = ReadWriteIOUtils.readString(byteBuffer);
+
+ return new CreateMultiTimeSeriesNode(
+ new PlanNodeId(id),
+ paths,
+ dataTypes,
+ encodings,
+ compressors,
+ aliasList,
+ tagsList,
+ attributesList);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CreateMultiTimeSeriesNode that = (CreateMultiTimeSeriesNode) o;
+ return this.getPlanNodeId().equals(that.getPlanNodeId())
+ && Objects.equals(paths, that.paths)
+ && Objects.equals(dataTypes, that.dataTypes)
+ && Objects.equals(encodings, that.encodings)
+ && Objects.equals(compressors, that.compressors)
+ && Objects.equals(tagOffsets, that.tagOffsets)
+ && Objects.equals(aliasList, that.aliasList)
+ && Objects.equals(tagsList, that.tagsList)
+ && Objects.equals(attributesList, that.attributesList);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.CREATE_MULTI_TIME_SERIES.serialize(byteBuffer);
+
+ // paths
+ byteBuffer.putInt(paths.size());
+ for (PartialPath path : paths) {
+ ReadWriteIOUtils.write(path.getFullPath(), byteBuffer);
+ }
+
+ // dataTypes
+ for (TSDataType dataType : dataTypes) {
+ byteBuffer.put((byte) dataType.ordinal());
+ }
+
+ // encodings
+ for (TSEncoding encoding : encodings) {
+ byteBuffer.put((byte) encoding.ordinal());
+ }
+
+ // compressors
+ for (CompressionType compressor : compressors) {
+ byteBuffer.put((byte) compressor.ordinal());
+ }
+
+ // alias
+ if (aliasList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (aliasList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (String alias : aliasList) {
+ ReadWriteIOUtils.write(alias, byteBuffer);
+ }
+ }
+
+ // tags
+ if (tagsList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (tagsList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (Map<String, String> tags : tagsList) {
+ ReadWriteIOUtils.write(tags, byteBuffer);
+ }
+ }
+
+ // attributes
+ if (attributesList == null) {
+ byteBuffer.put((byte) -1);
+ } else if (attributesList.isEmpty()) {
+ byteBuffer.put((byte) 0);
+ } else {
+ byteBuffer.put((byte) 1);
+ for (Map<String, String> attributes : attributesList) {
+ ReadWriteIOUtils.write(attributes, byteBuffer);
+ }
+ }
+ }
+
+ public int hashCode() {
+ return Objects.hash(
+ this.getPlanNodeId(),
+ paths,
+ dataTypes,
+ encodings,
+ compressors,
+ tagOffsets,
+ aliasList,
+ tagsList,
+ attributesList);
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ Map<TRegionReplicaSet, CreateMultiTimeSeriesNode> splitMap = new HashMap<>();
+ for (int i = 0; i < paths.size(); i++) {
+ TRegionReplicaSet regionReplicaSet =
+ analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(paths.get(i).getDevice());
+ CreateMultiTimeSeriesNode tmpNode;
+ if (splitMap.containsKey(regionReplicaSet)) {
+ tmpNode = splitMap.get(regionReplicaSet);
+ } else {
+ tmpNode = new CreateMultiTimeSeriesNode(this.getPlanNodeId());
+ tmpNode.setRegionReplicaSet(regionReplicaSet);
+ splitMap.put(regionReplicaSet, tmpNode);
+ }
+ tmpNode.addTimeSeries(
+ paths.get(i),
+ dataTypes.get(i),
+ encodings.get(i),
+ compressors.get(i),
+ aliasList == null ? null : aliasList.get(i),
+ attributesList == null ? null : tagsList.get(i),
+ attributesList == null ? null : attributesList.get(i));
+ }
+ return new ArrayList<>(splitMap.values());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 59f16004b3..59d11c3c9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
@@ -77,6 +78,12 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(createAlignedTimeSeriesStatement, context);
}
+ // Create Multi Timeseries
+ public R visitCreateMultiTimeseries(
+ CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, C context) {
+ return visitStatement(createMultiTimeSeriesStatement, context);
+ }
+
// Alter Timeseries
public R visitAlterTimeseries(AlterTimeSeriesStatement alterTimeSeriesStatement, C context) {
return visitStatement(alterTimeSeriesStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
new file mode 100644
index 0000000000..341e4d12a1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** CREATE MULTI TIMESERIES statement. */
+public class CreateMultiTimeSeriesStatement extends Statement {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(CreateMultiTimeSeriesStatement.class);
+
+ private List<PartialPath> paths;
+ private List<TSDataType> dataTypes = new ArrayList<>();
+ private List<TSEncoding> encodings = new ArrayList<>();
+ private List<CompressionType> compressors = new ArrayList<>();
+ private List<String> aliasList = new ArrayList<>();
+ private List<Map<String, String>> tagsList = new ArrayList<>();
+ private List<Map<String, String>> attributesList = new ArrayList<>();
+ private List<Long> tagOffsets = null;
+
+ public CreateMultiTimeSeriesStatement() {
+ super();
+ statementType = StatementType.CREATE_MULTI_TIMESERIES;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return paths;
+ }
+
+ public void setPaths(List<PartialPath> paths) {
+ this.paths = paths;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public void addDataType(TSDataType dataType) {
+ this.dataTypes.add(dataType);
+ }
+
+ public List<TSEncoding> getEncodings() {
+ return encodings;
+ }
+
+ public void setEncodings(List<TSEncoding> encodings) {
+ this.encodings = encodings;
+ }
+
+ public void addEncoding(TSEncoding encoding) {
+ this.encodings.add(encoding);
+ }
+
+ public List<CompressionType> getCompressors() {
+ return compressors;
+ }
+
+ public void setCompressors(List<CompressionType> compressors) {
+ this.compressors = compressors;
+ }
+
+ public void addCompressor(CompressionType compression) {
+ this.compressors.add(compression);
+ }
+
+ public List<String> getAliasList() {
+ return aliasList;
+ }
+
+ public void setAliasList(List<String> aliasList) {
+ this.aliasList = aliasList;
+ }
+
+ public void addAliasList(String alias) {
+ this.aliasList.add(alias);
+ }
+
+ public List<Map<String, String>> getTagsList() {
+ return tagsList;
+ }
+
+ public void setTagsList(List<Map<String, String>> tagsList) {
+ this.tagsList = tagsList;
+ }
+
+ public void addTagsList(Map<String, String> tags) {
+ this.tagsList.add(tags);
+ }
+
+ public List<Map<String, String>> getAttributesList() {
+ return attributesList;
+ }
+
+ public void setAttributesList(List<Map<String, String>> attributesList) {
+ this.attributesList = attributesList;
+ }
+
+ public void addAttributesList(Map<String, String> attributes) {
+ this.attributesList.add(attributes);
+ }
+
+ public List<Long> getTagOffsets() {
+ if (tagOffsets == null) {
+ tagOffsets = new ArrayList<>();
+ for (int i = 0; i < paths.size(); i++) {
+ tagOffsets.add(Long.parseLong("-1"));
+ }
+ }
+ return tagOffsets;
+ }
+
+ public void setTagOffsets(List<Long> tagOffsets) {
+ this.tagOffsets = tagOffsets;
+ }
+
+ public void addTagOffsets(Long tagsOffset) {
+ this.tagOffsets.add(tagsOffset);
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCreateMultiTimeseries(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index c9635fced0..f7f863a33c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.query.control.SessionManager;
@@ -367,7 +368,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
req.getMeasurements());
}
- // Step 1: transfer from CreateAlignedTimeSeriesStatement to Statement
+ // Step 1: transfer from CreateAlignedTimeSeriesReq to Statement
CreateAlignedTimeSeriesStatement statement =
(CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
@@ -397,7 +398,45 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
@Override
public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
- throw new UnsupportedOperationException();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session-{} create {} timeseries, the first is {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPaths().size(),
+ req.getPaths().get(0));
+ }
+
+ // Step 1: transfer from CreateMultiTimeSeriesReq to Statement
+ CreateMultiTimeSeriesStatement statement =
+ (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
}
@Override