You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/28 06:10:15 UTC
[iotdb] branch master updated: [IOTDB-2967] New writing process of cluster (#5656)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a9fd419fbd [IOTDB-2967] New writing process of cluster (#5656)
a9fd419fbd is described below
commit a9fd419fbd339591791dde185f856eb117169afc
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Apr 28 14:10:10 2022 +0800
[IOTDB-2967] New writing process of cluster (#5656)
---
.../apache/iotdb/db/mpp/execution/Coordinator.java | 2 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 130 +-----
.../iotdb/db/mpp/sql/analyze/SchemaValidator.java | 57 +++
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 154 +++----
.../plan/node/write/InsertMultiTabletsNode.java | 89 +++-
.../sql/planner/plan/node/write/InsertNode.java | 77 ++--
.../sql/planner/plan/node/write/InsertRowNode.java | 365 ++++++++++-----
.../planner/plan/node/write/InsertRowsNode.java | 89 +++-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 75 ++-
.../planner/plan/node/write/InsertTabletNode.java | 505 +++++++++++----------
.../db/mpp/sql/statement/crud/BatchInsert.java | 37 ++
.../sql/statement/crud/InsertBaseStatement.java | 31 --
.../crud/InsertMultiTabletsStatement.java | 11 -
.../mpp/sql/statement/crud/InsertRowStatement.java | 94 ----
.../crud/InsertRowsOfOneDeviceStatement.java | 18 -
.../sql/statement/crud/InsertRowsStatement.java | 18 -
.../sql/statement/crud/InsertTabletStatement.java | 51 +--
.../service/thrift/impl/InternalServiceImpl.java | 25 +-
.../db/engine/storagegroup/DataRegionTest.java | 44 +-
.../engine/storagegroup/TsFileProcessorV2Test.java | 25 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 33 +-
.../write/InsertMultiTabletsNodeSerdeTest.java | 101 +++++
.../plan/node/write/InsertRowNodeSerdeTest.java | 134 ++++--
.../plan/node/write/InsertRowsNodeSerdeTest.java | 72 +++
.../write/InsertRowsOfOneDeviceNodeSerdeTest.java | 74 +++
.../plan/node/write/InsertTabletNodeSerdeTest.java | 96 +++-
26 files changed, 1524 insertions(+), 883 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 7e0c78b268..9f02665c0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -49,7 +49,7 @@ public class Coordinator {
private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
- private static final int COORDINATOR_EXECUTOR_SIZE = 1;
+ private static final int COORDINATOR_EXECUTOR_SIZE = 10;
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index a0c55fdcf0..fb2cfdf6db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -299,68 +298,36 @@ public class Analyzer {
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaWithAutoCreate(
- insertTabletStatement.getDevicePath(),
- insertTabletStatement.getMeasurements(),
- insertTabletStatement.getDataTypes(),
- insertTabletStatement.isAligned());
-
- if (!insertTabletStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(
insertTabletStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap.put(
- schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
- Collections.singletonList(dataPartitionQueryParam));
- DataPartition dataPartition;
- dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(
+ Collections.singletonList(dataPartitionQueryParam));
Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
analysis.setStatement(insertTabletStatement);
analysis.setDataPartitionInfo(dataPartition);
+
return analysis;
}
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaWithAutoCreate(
- insertRowStatement.getDevicePath(),
- insertRowStatement.getMeasurements(),
- insertRowStatement.getDataTypes(),
- insertRowStatement.isAligned());
-
- try {
- insertRowStatement.transferType(schemaTree);
- } catch (QueryProcessException e) {
- throw new SemanticException(e.getMessage());
- }
- if (!insertRowStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap.put(
- schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
- Collections.singletonList(dataPartitionQueryParam));
+
DataPartition dataPartition =
- partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
+ partitionFetcher.getOrCreateDataPartition(
+ Collections.singletonList(dataPartitionQueryParam));
Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
analysis.setStatement(insertRowStatement);
analysis.setDataPartitionInfo(dataPartition);
@@ -371,41 +338,21 @@ public class Analyzer {
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaListWithAutoCreate(
- insertRowsStatement.getDevicePaths(),
- insertRowsStatement.getMeasurementsList(),
- insertRowsStatement.getDataTypesList(),
- insertRowsStatement.getAlignedList());
- try {
- insertRowsStatement.transferType(schemaTree);
- } catch (QueryProcessException e) {
- throw new SemanticException(e.getMessage());
- }
-
- if (!insertRowsStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertRowStatement insertRowStatement :
insertRowsStatement.getInsertRowStatementList()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(
insertRowStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap
- .computeIfAbsent(
- schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
- key -> new ArrayList<>())
- .add(dataPartitionQueryParam);
+ dataPartitionQueryParams.add(dataPartitionQueryParam);
}
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
analysis.setStatement(insertRowsStatement);
analysis.setDataPartitionInfo(dataPartition);
@@ -416,35 +363,21 @@ public class Analyzer {
public Analysis visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaListWithAutoCreate(
- insertMultiTabletsStatement.getDevicePaths(),
- insertMultiTabletsStatement.getMeasurementsList(),
- insertMultiTabletsStatement.getDataTypesList(),
- insertMultiTabletsStatement.getAlignedList());
-
- if (!insertMultiTabletsStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
for (InsertTabletStatement insertTabletStatement :
insertMultiTabletsStatement.getInsertTabletStatementList()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(
insertTabletStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap
- .computeIfAbsent(
- schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
- key -> new ArrayList<>())
- .add(dataPartitionQueryParam);
+ dataPartitionQueryParams.add(dataPartitionQueryParam);
}
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
analysis.setStatement(insertMultiTabletsStatement);
analysis.setDataPartitionInfo(dataPartition);
@@ -455,37 +388,18 @@ public class Analyzer {
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- // TODO remove duplicate
- SchemaTree schemaTree =
- schemaFetcher.fetchSchemaWithAutoCreate(
- insertRowsOfOneDeviceStatement.getDevicePath(),
- insertRowsOfOneDeviceStatement.getMeasurements(),
- insertRowsOfOneDeviceStatement.getDataTypes(),
- insertRowsOfOneDeviceStatement.isAligned());
- try {
- insertRowsOfOneDeviceStatement.transferType(schemaTree);
- } catch (QueryProcessException e) {
- throw new SemanticException(e.getMessage());
- }
-
- if (!insertRowsOfOneDeviceStatement.checkDataType(schemaTree)) {
- throw new SemanticException("Data type mismatch");
- }
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(
insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(
insertRowsOfOneDeviceStatement.getTimePartitionSlots());
- sgNameToQueryParamsMap.put(
- schemaTree.getBelongedStorageGroup(insertRowsOfOneDeviceStatement.getDevicePath()),
- Collections.singletonList(dataPartitionQueryParam));
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(
+ Collections.singletonList(dataPartitionQueryParam));
Analysis analysis = new Analysis();
- analysis.setSchemaTree(schemaTree);
analysis.setStatement(insertRowsOfOneDeviceStatement);
analysis.setDataPartitionInfo(dataPartition);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
new file mode 100644
index 0000000000..153da87d8e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
+
+public class SchemaValidator {
+
+ private static final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+
+ public static SchemaTree validate(InsertNode insertNode) {
+
+ SchemaTree schemaTree;
+ if (insertNode instanceof BatchInsert) {
+ BatchInsert batchInsert = (BatchInsert) insertNode;
+ schemaTree =
+ schemaFetcher.fetchSchemaListWithAutoCreate(
+ batchInsert.getDevicePaths(),
+ batchInsert.getMeasurementsList(),
+ batchInsert.getDataTypesList(),
+ batchInsert.getAlignedList());
+ } else {
+ schemaTree =
+ schemaFetcher.fetchSchemaWithAutoCreate(
+ insertNode.getDevicePath(),
+ insertNode.getMeasurements(),
+ insertNode.getDataTypes(),
+ insertNode.isAligned());
+ }
+
+ if (!insertNode.validateSchema(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ return schemaTree;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 230bfc8d3f..6e752421b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
@@ -31,6 +30,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSe
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
@@ -57,10 +57,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -231,20 +228,12 @@ public class LogicalPlanner {
@Override
public PlanNode visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
- // set schema in insert node
// convert insert statement to insert node
- DeviceSchemaInfo deviceSchemaInfo =
- analysis
- .getSchemaTree()
- .searchDeviceSchemaInfo(
- insertTabletStatement.getDevicePath(),
- Arrays.asList(insertTabletStatement.getMeasurements()));
- List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
return new InsertTabletNode(
context.getQueryId().genPlanNodeId(),
insertTabletStatement.getDevicePath(),
insertTabletStatement.isAligned(),
- measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertTabletStatement.getMeasurements(),
insertTabletStatement.getDataTypes(),
insertTabletStatement.getTimes(),
insertTabletStatement.getBitMaps(),
@@ -254,60 +243,16 @@ public class LogicalPlanner {
@Override
public PlanNode visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
- // set schema in insert node
// convert insert statement to insert node
- DeviceSchemaInfo deviceSchemaInfo =
- analysis
- .getSchemaTree()
- .searchDeviceSchemaInfo(
- insertRowStatement.getDevicePath(),
- Arrays.asList(insertRowStatement.getMeasurements()));
- List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
return new InsertRowNode(
context.getQueryId().genPlanNodeId(),
insertRowStatement.getDevicePath(),
insertRowStatement.isAligned(),
- measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertRowStatement.getMeasurements(),
insertRowStatement.getDataTypes(),
insertRowStatement.getTime(),
- insertRowStatement.getValues());
- }
-
- @Override
- public PlanNode visitShowTimeSeries(
- ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
- QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
- planBuilder.planTimeSeriesMetaSource(
- showTimeSeriesStatement.getPathPattern(),
- showTimeSeriesStatement.getKey(),
- showTimeSeriesStatement.getValue(),
- showTimeSeriesStatement.getLimit(),
- showTimeSeriesStatement.getOffset(),
- showTimeSeriesStatement.isOrderByHeat(),
- showTimeSeriesStatement.isContains(),
- showTimeSeriesStatement.isPrefixPath());
- planBuilder.planSchemaMerge(showTimeSeriesStatement.isOrderByHeat());
- if (showTimeSeriesStatement.getLimit() > 0) {
- planBuilder.planOffset(showTimeSeriesStatement.getOffset());
- planBuilder.planLimit(showTimeSeriesStatement.getLimit());
- }
- return planBuilder.getRoot();
- }
-
- @Override
- public PlanNode visitShowDevices(
- ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
- QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
- planBuilder.planDeviceSchemaSource(
- showDevicesStatement.getPathPattern(),
- showDevicesStatement.getLimit(),
- showDevicesStatement.getOffset(),
- showDevicesStatement.isPrefixPath(),
- showDevicesStatement.hasSgCol());
- planBuilder.planSchemaMerge(false);
- planBuilder.planOffset(showDevicesStatement.getOffset());
- planBuilder.planLimit(showDevicesStatement.getLimit());
- return planBuilder.getRoot();
+ insertRowStatement.getValues(),
+ insertRowStatement.isNeedInferType());
}
@Override
@@ -345,28 +290,21 @@ public class LogicalPlanner {
@Override
public PlanNode visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
- // set schema in insert node
// convert insert statement to insert node
InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
InsertRowStatement insertRowStatement =
insertRowsStatement.getInsertRowStatementList().get(i);
- DeviceSchemaInfo deviceSchemaInfo =
- analysis
- .getSchemaTree()
- .searchDeviceSchemaInfo(
- insertRowStatement.getDevicePath(),
- Arrays.asList(insertRowStatement.getMeasurements()));
- List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
insertRowsNode.addOneInsertRowNode(
new InsertRowNode(
insertRowsNode.getPlanNodeId(),
insertRowStatement.getDevicePath(),
insertRowStatement.isAligned(),
- measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertRowStatement.getMeasurements(),
insertRowStatement.getDataTypes(),
insertRowStatement.getTime(),
- insertRowStatement.getValues()),
+ insertRowStatement.getValues(),
+ insertRowStatement.isNeedInferType()),
i);
}
return insertRowsNode;
@@ -375,65 +313,87 @@ public class LogicalPlanner {
@Override
public PlanNode visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
- // set schema in insert node
// convert insert statement to insert node
InsertMultiTabletsNode insertMultiTabletsNode =
new InsertMultiTabletsNode(context.getQueryId().genPlanNodeId());
- List<InsertTabletNode> insertTabletNodeList = new ArrayList<>();
for (int i = 0; i < insertMultiTabletsStatement.getInsertTabletStatementList().size(); i++) {
InsertTabletStatement insertTabletStatement =
insertMultiTabletsStatement.getInsertTabletStatementList().get(i);
- DeviceSchemaInfo deviceSchemaInfo =
- analysis
- .getSchemaTree()
- .searchDeviceSchemaInfo(
- insertTabletStatement.getDevicePath(),
- Arrays.asList(insertTabletStatement.getMeasurements()));
- List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
- insertTabletNodeList.add(
+ insertMultiTabletsNode.addInsertTabletNode(
new InsertTabletNode(
insertMultiTabletsNode.getPlanNodeId(),
insertTabletStatement.getDevicePath(),
insertTabletStatement.isAligned(),
- measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertTabletStatement.getMeasurements(),
insertTabletStatement.getDataTypes(),
insertTabletStatement.getTimes(),
insertTabletStatement.getBitMaps(),
insertTabletStatement.getColumns(),
- insertTabletStatement.getRowCount()));
+ insertTabletStatement.getRowCount()),
+ i);
}
- insertMultiTabletsNode.setInsertTabletNodeList(insertTabletNodeList);
return insertMultiTabletsNode;
}
@Override
public PlanNode visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
- // set schema in insert node
// convert insert statement to insert node
- InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+ InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode =
+ new InsertRowsOfOneDeviceNode(context.getQueryId().genPlanNodeId());
for (int i = 0; i < insertRowsOfOneDeviceStatement.getInsertRowStatementList().size(); i++) {
InsertRowStatement insertRowStatement =
insertRowsOfOneDeviceStatement.getInsertRowStatementList().get(i);
- DeviceSchemaInfo deviceSchemaInfo =
- analysis
- .getSchemaTree()
- .searchDeviceSchemaInfo(
- insertRowStatement.getDevicePath(),
- Arrays.asList(insertRowStatement.getMeasurements()));
- List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
- insertRowsNode.addOneInsertRowNode(
+ insertRowsOfOneDeviceNode.addOneInsertRowNode(
new InsertRowNode(
- insertRowsNode.getPlanNodeId(),
+ insertRowsOfOneDeviceNode.getPlanNodeId(),
insertRowStatement.getDevicePath(),
insertRowStatement.isAligned(),
- measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertRowStatement.getMeasurements(),
insertRowStatement.getDataTypes(),
insertRowStatement.getTime(),
- insertRowStatement.getValues()),
+ insertRowStatement.getValues(),
+ insertRowStatement.isNeedInferType()),
i);
}
- return insertRowsNode;
+ return insertRowsOfOneDeviceNode;
+ }
+
+ @Override
+ public PlanNode visitShowTimeSeries(
+ ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ planBuilder.planTimeSeriesMetaSource(
+ showTimeSeriesStatement.getPathPattern(),
+ showTimeSeriesStatement.getKey(),
+ showTimeSeriesStatement.getValue(),
+ showTimeSeriesStatement.getLimit(),
+ showTimeSeriesStatement.getOffset(),
+ showTimeSeriesStatement.isOrderByHeat(),
+ showTimeSeriesStatement.isContains(),
+ showTimeSeriesStatement.isPrefixPath());
+ planBuilder.planSchemaMerge(showTimeSeriesStatement.isOrderByHeat());
+ if (showTimeSeriesStatement.getLimit() > 0) {
+ planBuilder.planOffset(showTimeSeriesStatement.getOffset());
+ planBuilder.planLimit(showTimeSeriesStatement.getLimit());
+ }
+ return planBuilder.getRoot();
+ }
+
+ @Override
+ public PlanNode visitShowDevices(
+ ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
+ QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+ planBuilder.planDeviceSchemaSource(
+ showDevicesStatement.getPathPattern(),
+ showDevicesStatement.getLimit(),
+ showDevicesStatement.getOffset(),
+ showDevicesStatement.isPrefixPath(),
+ showDevicesStatement.hasSgCol());
+ planBuilder.planSchemaMerge(false);
+ planBuilder.planOffset(showDevicesStatement.getOffset());
+ planBuilder.planLimit(showDevicesStatement.getLimit());
+ return planBuilder.getRoot();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 2c15674023..49088f418d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -21,11 +21,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -36,7 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class InsertMultiTabletsNode extends InsertNode {
+public class InsertMultiTabletsNode extends InsertNode implements BatchInsert {
/**
* the value is used to indict the parent InsertTabletNode's index when the parent
@@ -114,6 +118,16 @@ public class InsertMultiTabletsNode extends InsertNode {
parentInsertTabletNodeIndexList.add(parentIndex);
}
+ @Override
+ public boolean validateSchema(SchemaTree schemaTree) {
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ if (!insertTabletNode.validateSchema(schemaTree)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<TRegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
@@ -177,12 +191,81 @@ public class InsertMultiTabletsNode extends InsertNode {
return null;
}
+ @Override
+ public List<PartialPath> getDevicePaths() {
+ List<PartialPath> partialPaths = new ArrayList<>();
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ partialPaths.add(insertTabletNode.devicePath);
+ }
+ return partialPaths;
+ }
+
+ @Override
+ public List<String[]> getMeasurementsList() {
+ List<String[]> measurementsList = new ArrayList<>();
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ measurementsList.add(insertTabletNode.measurements);
+ }
+ return measurementsList;
+ }
+
+ @Override
+ public List<TSDataType[]> getDataTypesList() {
+ List<TSDataType[]> dataTypesList = new ArrayList<>();
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ dataTypesList.add(insertTabletNode.dataTypes);
+ }
+ return dataTypesList;
+ }
+
+ @Override
+ public List<Boolean> getAlignedList() {
+ List<Boolean> alignedList = new ArrayList<>();
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ alignedList.add(insertTabletNode.isAligned);
+ }
+ return alignedList;
+ }
+
public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ PlanNodeId planNodeId;
+ List<InsertTabletNode> insertTabletNodeList = new ArrayList<>();
+ List<Integer> parentIndex = new ArrayList<>();
+
+ int size = byteBuffer.getInt();
+ for (int i = 0; i < size; i++) {
+ InsertTabletNode insertTabletNode = new InsertTabletNode(new PlanNodeId(""));
+ insertTabletNode.subDeserialize(byteBuffer);
+ insertTabletNodeList.add(insertTabletNode);
+ }
+ for (int i = 0; i < size; i++) {
+ parentIndex.add(byteBuffer.getInt());
+ }
+
+ planNodeId = PlanNodeId.deserialize(byteBuffer);
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ insertTabletNode.setPlanNodeId(planNodeId);
+ }
+
+ InsertMultiTabletsNode insertMultiTabletsNode = new InsertMultiTabletsNode(planNodeId);
+ insertMultiTabletsNode.setInsertTabletNodeList(insertTabletNodeList);
+ insertMultiTabletsNode.setParentInsertTabletNodeIndexList(parentIndex);
+ return insertMultiTabletsNode;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.INSERT_MULTI_TABLET.serialize(byteBuffer);
+
+ byteBuffer.putInt(insertTabletNodeList.size());
+
+ for (InsertTabletNode node : insertTabletNodeList) {
+ node.subSerialize(byteBuffer);
+ }
+ for (Integer index : parentInsertTabletNodeIndexList) {
+ byteBuffer.putInt(index);
+ }
+ }
@Override
public boolean equals(Object o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 32520a924b..14b1bf1a23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -72,20 +74,13 @@ public abstract class InsertNode extends WritePlanNode {
PlanNodeId id,
PartialPath devicePath,
boolean isAligned,
- MeasurementSchema[] measurementSchemas,
+ String[] measurements,
TSDataType[] dataTypes) {
super(id);
this.devicePath = devicePath;
this.isAligned = isAligned;
- this.measurementSchemas = measurementSchemas;
+ this.measurements = measurements;
this.dataTypes = dataTypes;
-
- this.measurements = new String[measurementSchemas.length];
- for (int i = 0; i < measurementSchemas.length; i++) {
- if (measurementSchemas[i] != null) {
- measurements[i] = measurementSchemas[i].getMeasurementId();
- }
- }
}
public TRegionReplicaSet getDataRegionReplicaSet() {
@@ -142,27 +137,23 @@ public abstract class InsertNode extends WritePlanNode {
protected void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
for (MeasurementSchema measurementSchema : measurementSchemas) {
- if (measurementSchema != null) {
- WALWriteUtils.write(measurementSchema, buffer);
- }
+ WALWriteUtils.write(measurementSchema, buffer);
}
}
protected int serializeMeasurementSchemaSize() {
int byteLen = 0;
for (MeasurementSchema measurementSchema : measurementSchemas) {
- if (measurementSchema != null) {
- byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
- byteLen += 3 * Byte.BYTES;
- Map<String, String> props = measurementSchema.getProps();
- if (props == null) {
- byteLen += Integer.BYTES;
- } else {
- byteLen += Integer.BYTES;
- for (Map.Entry<String, String> entry : props.entrySet()) {
- byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
- byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
- }
+ byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
+ byteLen += 3 * Byte.BYTES;
+ Map<String, String> props = measurementSchema.getProps();
+ if (props == null) {
+ byteLen += Integer.BYTES;
+ } else {
+ byteLen += Integer.BYTES;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
+ byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
}
}
}
@@ -201,19 +192,37 @@ public abstract class InsertNode extends WritePlanNode {
return dataRegionReplicaSet;
}
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
+ public abstract boolean validateSchema(SchemaTree schemaTree);
+
+ public void setMeasurementSchemas(SchemaTree schemaTree) {
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+ measurementSchemas =
+ deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
}
- protected int countFailedMeasurements() {
- int result = 0;
- for (MeasurementSchema measurement : measurementSchemas) {
- if (measurement == null) {
- result++;
- }
+ /**
+ * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
+ * measurements, the failed values or columns would be null as well. We'd better use
+ * "measurements[index] == null" to determine if the measurement failed.
+ *
+ * @param index failed measurement index
+ */
+ public void markFailedMeasurementInsertion(int index, Exception e) {
+ // todo partial insert
+ if (measurements[index] == null) {
+ return;
}
- return result;
+ // if (failedMeasurements == null) {
+ // failedMeasurements = new ArrayList<>();
+ // }
+ // failedMeasurements.add(measurements[index]);
+ measurements[index] = null;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 84d91add1e..94654b50fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -19,16 +19,23 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
@@ -53,11 +60,15 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
private static final Logger logger = LoggerFactory.getLogger(InsertRowNode.class);
+ private static final byte TYPE_RAW_STRING = -1;
+
private static final byte TYPE_NULL = -2;
private long time;
private Object[] values;
+ private boolean isNeedInferType = false;
+
public InsertRowNode(PlanNodeId id) {
super(id);
}
@@ -66,13 +77,15 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
PlanNodeId id,
PartialPath devicePath,
boolean isAligned,
- MeasurementSchema[] measurements,
+ String[] measurements,
TSDataType[] dataTypes,
long time,
- Object[] values) {
+ Object[] values,
+ boolean isNeedInferType) {
super(id, devicePath, isAligned, measurements, dataTypes);
this.time = time;
this.values = values;
+ this.isNeedInferType = isNeedInferType;
}
@Override
@@ -118,6 +131,131 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
return null;
}
+ public Object[] getValues() {
+ return values;
+ }
+
+ public void setValues(Object[] values) {
+ this.values = values;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+
+ public boolean isNeedInferType() {
+ return isNeedInferType;
+ }
+
+ public void setNeedInferType(boolean needInferType) {
+ isNeedInferType = needInferType;
+ }
+
+ @Override
+ public boolean validateSchema(SchemaTree schemaTree) {
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+
+ List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
+
+ if (isNeedInferType) {
+ try {
+ transferType(measurementSchemas);
+ } catch (QueryProcessException e) {
+ return false;
+ }
+ } else {
+ // todo partial insert
+ if (deviceSchemaInfo.isAligned() != isAligned) {
+ return false;
+ }
+
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ return false;
+ } else {
+ markFailedMeasurementInsertion(
+ i,
+ new DataTypeMismatchException(
+ devicePath.getFullPath(),
+ measurements[i],
+ measurementSchemas.get(i).getType(),
+ dataTypes[i]));
+ }
+ }
+ }
+ }
+
+ // filter failed measurements
+ measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
+ dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
+ values = Arrays.stream(values).filter(Objects::nonNull).toArray(Object[]::new);
+
+ return true;
+ }
+
+ @Override
+ public void markFailedMeasurementInsertion(int index, Exception e) {
+ if (measurements[index] == null) {
+ return;
+ }
+ super.markFailedMeasurementInsertion(index, e);
+ values[index] = null;
+ dataTypes[index] = null;
+ }
+
+ /**
+ * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
+ * Double, Binary)
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ public void transferType(List<MeasurementSchema> measurementSchemas)
+ throws QueryProcessException {
+ if (isNeedInferType) {
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ if (measurementSchemas.get(i) == null) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ markFailedMeasurementInsertion(
+ i,
+ new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath()
+ + IoTDBConstant.PATH_SEPARATOR
+ + measurements[i])));
+ } else {
+ throw new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+ }
+ continue;
+ }
+
+ dataTypes[i] = measurementSchemas.get(i).getType();
+ try {
+ values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+ } catch (Exception e) {
+ logger.warn(
+ "{}.{} data type is not consistent, input {}, registered {}",
+ devicePath,
+ measurements[i],
+ values[i],
+ dataTypes[i]);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ markFailedMeasurementInsertion(i, e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ isNeedInferType = false;
+ }
+ }
+
@Override
public int serializedSize() {
int size = 0;
@@ -173,33 +311,12 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
return size;
}
- @Override
- public int hashCode() {
- int result = Objects.hash(super.hashCode(), time);
- result = 31 * result + Arrays.hashCode(values);
- return result;
- }
-
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.INSERT_ROW.serialize(byteBuffer);
subSerialize(byteBuffer);
}
- public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
- // TODO: (xingtanzjr) remove placeholder
- InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
- insertNode.setTime(byteBuffer.getLong());
- try {
- insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
- } catch (IllegalPathException e) {
- throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
- }
- insertNode.deserializeMeasurementsAndValues(byteBuffer);
- insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
- return insertNode;
- }
-
void subSerialize(ByteBuffer buffer) {
buffer.putLong(time);
ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
@@ -207,12 +324,18 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
void serializeMeasurementsAndValues(ByteBuffer buffer) {
- buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+ buffer.putInt(measurements.length);
- for (MeasurementSchema measurementSchema : measurementSchemas) {
- if (measurementSchema != null) {
+ // check whether has measurement schemas or not
+ buffer.put((byte) (measurementSchemas != null ? 1 : 0));
+ if (measurementSchemas != null) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
measurementSchema.serializeTo(buffer);
}
+ } else {
+ for (String measurement : measurements) {
+ ReadWriteIOUtils.write(measurement, buffer);
+ }
}
try {
@@ -221,16 +344,22 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
logger.error("Failed to serialize values for {}", this, e);
}
+ buffer.put((byte) (isNeedInferType ? 1 : 0));
buffer.put((byte) (isAligned ? 1 : 0));
}
private void putValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < values.length; i++) {
- if (dataTypes[i] != null) {
- if (values[i] == null) {
- ReadWriteIOUtils.write(TYPE_NULL, buffer);
- continue;
- }
+ if (values[i] == null) {
+ ReadWriteIOUtils.write(TYPE_NULL, buffer);
+ continue;
+ }
+ // types are not determined, the situation mainly occurs when the plan uses string values
+ // and is forwarded to other nodes
+ if (dataTypes == null || dataTypes[i] == null) {
+ ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
+ ReadWriteIOUtils.write(values[i].toString(), buffer);
+ } else {
ReadWriteIOUtils.write(dataTypes[i], buffer);
switch (dataTypes[i]) {
case BOOLEAN:
@@ -258,91 +387,39 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
}
- @Override
- public void serializeToWAL(IWALByteBufferView buffer) {
- buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
- subSerialize(buffer);
- }
-
- void subSerialize(IWALByteBufferView buffer) {
- buffer.putLong(time);
- WALWriteUtils.write(devicePath.getFullPath(), buffer);
- serializeMeasurementsAndValues(buffer);
+ public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
+ // TODO: (xingtanzjr) remove placeholder
+ InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
+ insertNode.subDeserialize(byteBuffer);
+ insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+ return insertNode;
}
- void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
- buffer.putInt(measurementSchemas.length - countFailedMeasurements());
-
- serializeMeasurementSchemaToWAL(buffer);
-
+ public void subDeserialize(ByteBuffer byteBuffer) {
+ time = byteBuffer.getLong();
try {
- putValues(buffer);
- } catch (QueryProcessException e) {
- logger.error("Failed to serialize values for {}", this, e);
- }
-
- buffer.put((byte) (isAligned ? 1 : 0));
- }
-
- private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
- // todo remove serialize datatype after serializing measurement schema
- for (int i = 0; i < values.length; i++) {
- if (dataTypes[i] != null) {
- if (values[i] == null) {
- WALWriteUtils.write(TYPE_NULL, buffer);
- continue;
- }
- WALWriteUtils.write(dataTypes[i], buffer);
- switch (dataTypes[i]) {
- case BOOLEAN:
- WALWriteUtils.write((Boolean) values[i], buffer);
- break;
- case INT32:
- WALWriteUtils.write((Integer) values[i], buffer);
- break;
- case INT64:
- WALWriteUtils.write((Long) values[i], buffer);
- break;
- case FLOAT:
- WALWriteUtils.write((Float) values[i], buffer);
- break;
- case DOUBLE:
- WALWriteUtils.write((Double) values[i], buffer);
- break;
- case TEXT:
- WALWriteUtils.write((Binary) values[i], buffer);
- break;
- default:
- throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
- }
- }
+ devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
}
- }
-
- public Object[] getValues() {
- return values;
- }
-
- public void setValues(Object[] values) {
- this.values = values;
- }
-
- public long getTime() {
- return time;
- }
-
- public void setTime(long time) {
- this.time = time;
+ deserializeMeasurementsAndValues(byteBuffer);
}
void deserializeMeasurementsAndValues(ByteBuffer buffer) {
int measurementSize = buffer.getInt();
this.measurements = new String[measurementSize];
- this.measurementSchemas = new MeasurementSchema[measurementSize];
- for (int i = 0; i < measurementSize; i++) {
- measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
- measurements[i] = measurementSchemas[i].getMeasurementId();
+ boolean hasSchema = buffer.get() == 1;
+ if (hasSchema) {
+ this.measurementSchemas = new MeasurementSchema[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+ measurements[i] = measurementSchemas[i].getMeasurementId();
+ }
+ } else {
+ for (int i = 0; i < measurementSize; i++) {
+ measurements[i] = ReadWriteIOUtils.readString(buffer);
+ }
}
this.dataTypes = new TSDataType[measurementSize];
@@ -353,14 +430,18 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
e.printStackTrace();
}
+ isNeedInferType = buffer.get() == 1;
isAligned = buffer.get() == 1;
}
/** Make sure the values is already inited before calling this */
public void fillValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < dataTypes.length; i++) {
+ // types are not determined, the situation mainly occurs when the node uses string values
+ // and is forwarded to other nodes
byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
- if (typeNum == TYPE_NULL) {
+ if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
+ values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null;
continue;
}
dataTypes[i] = TSDataType.values()[typeNum];
@@ -389,6 +470,65 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
}
+ @Override
+ public void serializeToWAL(IWALByteBufferView buffer) {
+ buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
+ subSerialize(buffer);
+ }
+
+ void subSerialize(IWALByteBufferView buffer) {
+ buffer.putLong(time);
+ WALWriteUtils.write(devicePath.getFullPath(), buffer);
+ serializeMeasurementsAndValues(buffer);
+ }
+
+ void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
+ buffer.putInt(measurementSchemas.length);
+
+ serializeMeasurementSchemaToWAL(buffer);
+
+ try {
+ putValues(buffer);
+ } catch (QueryProcessException e) {
+ logger.error("Failed to serialize values for {}", this, e);
+ }
+
+ buffer.put((byte) (isAligned ? 1 : 0));
+ }
+
+ private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
+ // todo remove serialize datatype after serializing measurement schema
+ for (int i = 0; i < values.length; i++) {
+ if (values[i] == null) {
+ WALWriteUtils.write(TYPE_NULL, buffer);
+ continue;
+ }
+ WALWriteUtils.write(dataTypes[i], buffer);
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ WALWriteUtils.write((Boolean) values[i], buffer);
+ break;
+ case INT32:
+ WALWriteUtils.write((Integer) values[i], buffer);
+ break;
+ case INT64:
+ WALWriteUtils.write((Long) values[i], buffer);
+ break;
+ case FLOAT:
+ WALWriteUtils.write((Float) values[i], buffer);
+ break;
+ case DOUBLE:
+ WALWriteUtils.write((Double) values[i], buffer);
+ break;
+ case TEXT:
+ WALWriteUtils.write((Binary) values[i], buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
+
public static InsertRowNode deserialize(DataInputStream stream)
throws IOException, IllegalPathException {
// This method is used for deserialize from wal
@@ -458,6 +598,15 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
InsertRowNode that = (InsertRowNode) o;
- return time == that.time && Arrays.equals(values, that.values);
+ return time == that.time
+ && isNeedInferType == that.isNeedInferType
+ && Arrays.equals(values, that.values);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(super.hashCode(), time, isNeedInferType);
+ result = 31 * result + Arrays.hashCode(values);
+ return result;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 2cf83f8f2e..eccede30d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -22,11 +22,15 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,7 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class InsertRowsNode extends InsertNode {
+public class InsertRowsNode extends InsertNode implements BatchInsert {
/**
* Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
@@ -100,6 +104,16 @@ public class InsertRowsNode extends InsertNode {
@Override
public void addChild(PlanNode child) {}
+ @Override
+ public boolean validateSchema(SchemaTree schemaTree) {
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ if (!insertRowNode.validateSchema(schemaTree)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -140,12 +154,81 @@ public class InsertRowsNode extends InsertNode {
return null;
}
+ @Override
+ public List<PartialPath> getDevicePaths() {
+ List<PartialPath> partialPaths = new ArrayList<>();
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ partialPaths.add(insertRowNode.devicePath);
+ }
+ return partialPaths;
+ }
+
+ @Override
+ public List<String[]> getMeasurementsList() {
+ List<String[]> measurementsList = new ArrayList<>();
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ measurementsList.add(insertRowNode.measurements);
+ }
+ return measurementsList;
+ }
+
+ @Override
+ public List<TSDataType[]> getDataTypesList() {
+ List<TSDataType[]> dataTypesList = new ArrayList<>();
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ dataTypesList.add(insertRowNode.dataTypes);
+ }
+ return dataTypesList;
+ }
+
+ @Override
+ public List<Boolean> getAlignedList() {
+ List<Boolean> alignedList = new ArrayList<>();
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ alignedList.add(insertRowNode.isAligned);
+ }
+ return alignedList;
+ }
+
public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
- return null;
+ PlanNodeId planNodeId;
+ List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+ List<Integer> insertRowNodeIndex = new ArrayList<>();
+
+ int size = byteBuffer.getInt();
+ for (int i = 0; i < size; i++) {
+ InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId(""));
+ insertRowNode.subDeserialize(byteBuffer);
+ insertRowNodeList.add(insertRowNode);
+ }
+ for (int i = 0; i < size; i++) {
+ insertRowNodeIndex.add(byteBuffer.getInt());
+ }
+
+ planNodeId = PlanNodeId.deserialize(byteBuffer);
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ insertRowNode.setPlanNodeId(planNodeId);
+ }
+
+ InsertRowsNode insertRowsNode = new InsertRowsNode(planNodeId);
+ insertRowsNode.setInsertRowNodeList(insertRowNodeList);
+ insertRowsNode.setInsertRowNodeIndexList(insertRowNodeIndex);
+ return insertRowsNode;
}
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.INSERT_ROWS.serialize(byteBuffer);
+
+ byteBuffer.putInt(insertRowNodeList.size());
+
+ for (InsertRowNode node : insertRowNodeList) {
+ node.subSerialize(byteBuffer);
+ }
+ for (Integer index : insertRowNodeIndexList) {
+ byteBuffer.putInt(index);
+ }
+ }
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 16abdd1049..1705d10f38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -22,13 +22,18 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -121,12 +126,15 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
return null;
}
- public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
- return null;
- }
-
@Override
- public void serialize(ByteBuffer byteBuffer) {}
+ public boolean validateSchema(SchemaTree schemaTree) {
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ if (!insertRowNode.validateSchema(schemaTree)) {
+ return false;
+ }
+ }
+ return true;
+ }
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -154,6 +162,63 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
return new ArrayList<>(splitMap.values());
}
+ public void addOneInsertRowNode(InsertRowNode node, int index) {
+ insertRowNodeList.add(node);
+ insertRowNodeIndexList.add(index);
+ }
+
+ public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
+ PartialPath devicePath;
+ PlanNodeId planNodeId;
+ List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+ List<Integer> insertRowNodeIndex = new ArrayList<>();
+
+ try {
+ devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowsOfOneDeviceNode", e);
+ }
+
+ int size = byteBuffer.getInt();
+ for (int i = 0; i < size; i++) {
+ InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId(""));
+ insertRowNode.setDevicePath(devicePath);
+ insertRowNode.setTime(byteBuffer.getLong());
+ insertRowNode.deserializeMeasurementsAndValues(byteBuffer);
+ insertRowNodeList.add(insertRowNode);
+ }
+ for (int i = 0; i < size; i++) {
+ insertRowNodeIndex.add(byteBuffer.getInt());
+ }
+
+ planNodeId = PlanNodeId.deserialize(byteBuffer);
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ insertRowNode.setPlanNodeId(planNodeId);
+ }
+
+ InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode = new InsertRowsOfOneDeviceNode(planNodeId);
+ insertRowsOfOneDeviceNode.setInsertRowNodeList(insertRowNodeList);
+ insertRowsOfOneDeviceNode.setInsertRowNodeIndexList(insertRowNodeIndex);
+ insertRowsOfOneDeviceNode.setDevicePath(devicePath);
+ return insertRowsOfOneDeviceNode;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.serialize(byteBuffer);
+ ReadWriteIOUtils.write(devicePath.getFullPath(), byteBuffer);
+
+ byteBuffer.putInt(insertRowNodeList.size());
+
+ for (InsertRowNode node : insertRowNodeList) {
+ byteBuffer.putLong(node.getTime());
+ node.serializeMeasurementsAndValues(byteBuffer);
+ }
+ for (Integer index : insertRowNodeIndexList) {
+ byteBuffer.putInt(index);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 1b34a0ce04..0b9c307568 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -20,10 +20,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -83,13 +88,13 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
PlanNodeId id,
PartialPath devicePath,
boolean isAligned,
- MeasurementSchema[] measurementSchemas,
+ String[] measurements,
TSDataType[] dataTypes,
long[] times,
BitMap[] bitMaps,
Object[] columns,
int rowCount) {
- super(id, devicePath, isAligned, measurementSchemas, dataTypes);
+ super(id, devicePath, isAligned, measurements, dataTypes);
this.times = times;
this.bitMaps = bitMaps;
this.columns = columns;
@@ -169,6 +174,148 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
return null;
}
+ @Override
+ public boolean validateSchema(SchemaTree schemaTree) {
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+
+ // todo partial insert
+ if (deviceSchemaInfo.isAligned() != isAligned) {
+ return false;
+ }
+
+ List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ return false;
+ } else {
+ markFailedMeasurementInsertion(
+ i,
+ new DataTypeMismatchException(
+ devicePath.getFullPath(),
+ measurements[i],
+ measurementSchemas.get(i).getType(),
+ dataTypes[i]));
+ }
+ }
+ }
+
+ // filter failed measurements
+ measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
+ dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
+ columns = Arrays.stream(columns).filter(Objects::nonNull).toArray(Object[]::new);
+
+ return true;
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ // only single device in single storage group
+ List<WritePlanNode> result = new ArrayList<>();
+ if (times.length == 0) {
+ return Collections.emptyList();
+ }
+ long startTime =
+ (times[0] / StorageEngine.getTimePartitionInterval())
+ * StorageEngine.getTimePartitionInterval(); // included
+ long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
+ TTimePartitionSlot timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[0]);
+ int startLoc = 0; // included
+
+ List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+ // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ...
+ List<Integer> ranges = new ArrayList<>();
+ for (int i = 1; i < times.length; i++) { // times are sorted in session API.
+ if (times[i] >= endTime) {
+ // a new range.
+ ranges.add(startLoc); // included
+ ranges.add(i); // excluded
+ timePartitionSlots.add(timePartitionSlot);
+ // next init
+ startLoc = i;
+ startTime = endTime;
+ endTime =
+ (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+ * StorageEngine.getTimePartitionInterval();
+ timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[i]);
+ }
+ }
+
+ // the final range
+ ranges.add(startLoc); // included
+ ranges.add(times.length); // excluded
+ timePartitionSlots.add(timePartitionSlot);
+
+ // data region for each time partition
+ List<TRegionReplicaSet> dataRegionReplicaSets =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
+
+ Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
+ for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
+ List<Integer> sub_ranges =
+ splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new ArrayList<>());
+ sub_ranges.add(ranges.get(i));
+ sub_ranges.add(ranges.get(i + 1));
+ }
+
+ List<Integer> locs;
+ for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) {
+ // generate a new times and values
+ locs = entry.getValue();
+ int count = 0;
+ for (int i = 0; i < locs.size(); i += 2) {
+ int start = locs.get(i);
+ int end = locs.get(i + 1);
+ count += end - start;
+ }
+ long[] subTimes = new long[count];
+ int destLoc = 0;
+ Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+ BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count);
+ for (int i = 0; i < locs.size(); i += 2) {
+ int start = locs.get(i);
+ int end = locs.get(i + 1);
+ System.arraycopy(times, start, subTimes, destLoc, end - start);
+ for (int k = 0; k < values.length; k++) {
+ System.arraycopy(columns[k], start, values[k], destLoc, end - start);
+ if (bitMaps != null && this.bitMaps[k] != null) {
+ BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start);
+ }
+ }
+ destLoc += end - start;
+ }
+ InsertTabletNode subNode =
+ new InsertTabletNode(
+ getPlanNodeId(),
+ devicePath,
+ isAligned,
+ measurements,
+ dataTypes,
+ subTimes,
+ bitMaps,
+ values,
+ subTimes.length);
+ subNode.setRange(locs);
+ subNode.setDataRegionReplicaSet(entry.getKey());
+ result.add(subNode);
+ }
+ return result;
+ }
+
+ @Override
+ public void markFailedMeasurementInsertion(int index, Exception e) {
+ if (measurements[index] == null) {
+ return;
+ }
+ super.markFailedMeasurementInsertion(index, e);
+ dataTypes[index] = null;
+ columns[index] = null;
+ bitMaps[index] = null;
+ }
+
@Override
public int serializedSize() {
return serializedSize(0, rowCount);
@@ -188,13 +335,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
size += serializeMeasurementSchemaSize();
- // data types size
- size += Integer.BYTES;
- for (int i = 0; i < dataTypes.length; i++) {
- if (measurements[i] != null) {
- size += Byte.BYTES;
- }
- }
+ size += Byte.BYTES * dataTypes.length;
+
// times size
size += Integer.BYTES;
size += Long.BYTES * (end - start);
@@ -217,7 +359,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
size += getColumnSize(dataTypes[i], columns[i], start, end);
}
}
- size += Long.BYTES;
+
size += Byte.BYTES;
return size;
}
@@ -267,19 +409,24 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
private void writeMeasurements(ByteBuffer buffer) {
- buffer.putInt(measurementSchemas.length - countFailedMeasurements());
- for (MeasurementSchema measurement : measurementSchemas) {
- if (measurement != null) {
+ buffer.putInt(measurements.length);
+
+ // check whether has measurement schemas or not
+ buffer.put((byte) (measurementSchemas != null ? 1 : 0));
+
+ if (measurementSchemas != null) {
+ for (MeasurementSchema measurement : measurementSchemas) {
measurement.serializeTo(buffer);
}
+ } else {
+ for (String measurement : measurements) {
+ ReadWriteIOUtils.write(measurement, buffer);
+ }
}
}
private void writeDataTypes(ByteBuffer buffer) {
for (TSDataType dataType : dataTypes) {
- if (dataType == null) {
- continue;
- }
dataType.serializeTo(buffer);
}
}
@@ -295,15 +442,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
buffer.put(BytesUtils.boolToByte(bitMaps != null));
if (bitMaps != null) {
for (int i = 0; i < measurements.length; i++) {
- // check failed measurement
- if (measurements[i] != null) {
- BitMap bitMap = bitMaps[i];
- if (bitMap == null) {
- buffer.put(BytesUtils.boolToByte(false));
- } else {
- buffer.put(BytesUtils.boolToByte(true));
- buffer.put(bitMap.getByteArray());
- }
+ BitMap bitMap = bitMaps[i];
+ if (bitMap == null) {
+ buffer.put(BytesUtils.boolToByte(false));
+ } else {
+ buffer.put(BytesUtils.boolToByte(true));
+ buffer.put(bitMap.getByteArray());
}
}
}
@@ -311,9 +455,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
private void writeValues(ByteBuffer buffer) {
for (int i = 0; i < dataTypes.length; i++) {
- if (columns[i] == null) {
- continue;
- }
serializeColumn(dataTypes[i], columns[i], buffer);
}
}
@@ -383,91 +524,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
private void writeMeasurements(IWALByteBufferView buffer) {
- buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+ buffer.putInt(measurementSchemas.length);
serializeMeasurementSchemaToWAL(buffer);
}
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
- InsertTabletNode that = (InsertTabletNode) o;
- return rowCount == that.rowCount
- && Arrays.equals(times, that.times)
- && Arrays.equals(bitMaps, that.bitMaps)
- && equals(that.columns)
- && Objects.equals(range, that.range);
- }
-
- private boolean equals(Object[] columns) {
- if (this.columns == columns) {
- return true;
- }
-
- if (columns == null || this.columns == null || columns.length != this.columns.length) {
- return false;
- }
-
- for (int i = 0; i < columns.length; i++) {
- if (dataTypes[i] != null) {
- switch (dataTypes[i]) {
- case INT32:
- if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) {
- return false;
- }
- break;
- case INT64:
- if (!Arrays.equals((long[]) this.columns[i], (long[]) columns[i])) {
- return false;
- }
- break;
- case FLOAT:
- if (!Arrays.equals((float[]) this.columns[i], (float[]) columns[i])) {
- return false;
- }
- break;
- case DOUBLE:
- if (!Arrays.equals((double[]) this.columns[i], (double[]) columns[i])) {
- return false;
- }
- break;
- case BOOLEAN:
- if (!Arrays.equals((boolean[]) this.columns[i], (boolean[]) columns[i])) {
- return false;
- }
- break;
- case TEXT:
- if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) columns[i])) {
- return false;
- }
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
- }
- } else if (!columns[i].equals(columns)) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = Objects.hash(super.hashCode(), rowCount, range);
- result = 31 * result + Arrays.hashCode(times);
- result = 31 * result + Arrays.hashCode(bitMaps);
- result = 31 * result + Arrays.hashCode(columns);
- return result;
- }
-
private void writeDataTypes(IWALByteBufferView buffer) {
for (TSDataType dataType : dataTypes) {
- if (dataType == null) {
- continue;
- }
WALWriteUtils.write(dataType, buffer);
}
}
@@ -483,18 +545,15 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
buffer.put(BytesUtils.boolToByte(bitMaps != null));
if (bitMaps != null) {
for (int i = 0; i < measurements.length; i++) {
- // check failed measurement
- if (measurements[i] != null) {
- BitMap bitMap = bitMaps[i];
- if (bitMap == null) {
- buffer.put(BytesUtils.boolToByte(false));
- } else {
- buffer.put(BytesUtils.boolToByte(true));
- int len = end - start;
- BitMap partBitMap = new BitMap(len);
- BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
- buffer.put(partBitMap.getByteArray());
- }
+ BitMap bitMap = bitMaps[i];
+ if (bitMap == null) {
+ buffer.put(BytesUtils.boolToByte(false));
+ } else {
+ buffer.put(BytesUtils.boolToByte(true));
+ int len = end - start;
+ BitMap partBitMap = new BitMap(len);
+ BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+ buffer.put(partBitMap.getByteArray());
}
}
}
@@ -502,9 +561,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
private void writeValues(IWALByteBufferView buffer, int start, int end) {
for (int i = 0; i < dataTypes.length; i++) {
- if (columns[i] == null) {
- continue;
- }
serializeColumn(dataTypes[i], columns[i], buffer, start, end);
}
}
@@ -554,102 +610,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
}
- @Override
- public List<WritePlanNode> splitByPartition(Analysis analysis) {
- // only single device in single storage group
- List<WritePlanNode> result = new ArrayList<>();
- if (times.length == 0) {
- return Collections.emptyList();
- }
- long startTime =
- (times[0] / StorageEngineV2.getTimePartitionInterval())
- * StorageEngineV2.getTimePartitionInterval(); // included
- long endTime = startTime + StorageEngineV2.getTimePartitionInterval(); // excluded
- TTimePartitionSlot timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[0]);
- int startLoc = 0; // included
-
- List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
- // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ...
- List<Integer> ranges = new ArrayList<>();
- for (int i = 1; i < times.length; i++) { // times are sorted in session API.
- if (times[i] >= endTime) {
- // a new range.
- ranges.add(startLoc); // included
- ranges.add(i); // excluded
- timePartitionSlots.add(timePartitionSlot);
- // next init
- startLoc = i;
- startTime = endTime;
- endTime =
- (times[i] / StorageEngineV2.getTimePartitionInterval() + 1)
- * StorageEngineV2.getTimePartitionInterval();
- timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[i]);
- }
- }
-
- // the final range
- ranges.add(startLoc); // included
- ranges.add(times.length); // excluded
- timePartitionSlots.add(timePartitionSlot);
-
- // data region for each time partition
- List<TRegionReplicaSet> dataRegionReplicaSets =
- analysis
- .getDataPartitionInfo()
- .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
-
- Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
- for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
- List<Integer> sub_ranges =
- splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new ArrayList<>());
- sub_ranges.add(ranges.get(i));
- sub_ranges.add(ranges.get(i + 1));
- }
-
- List<Integer> locs;
- for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) {
- // generate a new times and values
- locs = entry.getValue();
- int count = 0;
- for (int i = 0; i < locs.size(); i += 2) {
- int start = locs.get(i);
- int end = locs.get(i + 1);
- count += end - start;
- }
- long[] subTimes = new long[count];
- int destLoc = 0;
- Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
- BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count);
- for (int i = 0; i < locs.size(); i += 2) {
- int start = locs.get(i);
- int end = locs.get(i + 1);
- System.arraycopy(times, start, subTimes, destLoc, end - start);
- for (int k = 0; k < values.length; k++) {
- System.arraycopy(columns[k], start, values[k], destLoc, end - start);
- if (bitMaps != null && this.bitMaps[k] != null) {
- BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start);
- }
- }
- destLoc += end - start;
- }
- InsertTabletNode subNode =
- new InsertTabletNode(
- getPlanNodeId(),
- devicePath,
- isAligned,
- measurementSchemas,
- dataTypes,
- subTimes,
- bitMaps,
- values,
- subTimes.length);
- subNode.setRange(locs);
- subNode.setDataRegionReplicaSet(entry.getKey());
- result.add(subNode);
- }
- return result;
- }
-
private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
Object[] values = new Object[columnSize];
for (int i = 0; i < values.length; i++) {
@@ -687,24 +647,33 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
- try {
- insertNode.subDeserialize(byteBuffer);
- } catch (IllegalPathException e) {
- throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
- }
+ insertNode.subDeserialize(byteBuffer);
insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
return insertNode;
}
- private void subDeserialize(ByteBuffer buffer) throws IllegalPathException {
- this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+ public void subDeserialize(ByteBuffer buffer) {
+ try {
+ this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+ }
int measurementSize = buffer.getInt();
this.measurements = new String[measurementSize];
- this.measurementSchemas = new MeasurementSchema[measurementSize];
- for (int i = 0; i < measurementSize; i++) {
- measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
- measurements[i] = measurementSchemas[i].getMeasurementId();
+
+ boolean hasSchema = buffer.get() == 1;
+
+ if (hasSchema) {
+ this.measurementSchemas = new MeasurementSchema[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+ measurements[i] = measurementSchemas[i].getMeasurementId();
+ }
+ } else {
+ for (int i = 0; i < measurementSize; i++) {
+ measurements[i] = ReadWriteIOUtils.readString(buffer);
+ }
}
this.dataTypes = new TSDataType[measurementSize];
@@ -761,4 +730,80 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rows);
this.isAligned = stream.readByte() == 1;
}
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(super.hashCode(), rowCount, range);
+ result = 31 * result + Arrays.hashCode(times);
+ result = 31 * result + Arrays.hashCode(bitMaps);
+ result = 31 * result + Arrays.hashCode(columns);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ InsertTabletNode that = (InsertTabletNode) o;
+ return rowCount == that.rowCount
+ && Arrays.equals(times, that.times)
+ && Arrays.equals(bitMaps, that.bitMaps)
+ && equals(that.columns)
+ && Objects.equals(range, that.range);
+ }
+
+ private boolean equals(Object[] columns) {
+ if (this.columns == columns) {
+ return true;
+ }
+
+ if (columns == null || this.columns == null || columns.length != this.columns.length) {
+ return false;
+ }
+
+ for (int i = 0; i < columns.length; i++) {
+ if (dataTypes[i] != null) {
+ switch (dataTypes[i]) {
+ case INT32:
+ if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) {
+ return false;
+ }
+ break;
+ case INT64:
+ if (!Arrays.equals((long[]) this.columns[i], (long[]) columns[i])) {
+ return false;
+ }
+ break;
+ case FLOAT:
+ if (!Arrays.equals((float[]) this.columns[i], (float[]) columns[i])) {
+ return false;
+ }
+ break;
+ case DOUBLE:
+ if (!Arrays.equals((double[]) this.columns[i], (double[]) columns[i])) {
+ return false;
+ }
+ break;
+ case BOOLEAN:
+ if (!Arrays.equals((boolean[]) this.columns[i], (boolean[]) columns[i])) {
+ return false;
+ }
+ break;
+ case TEXT:
+ if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) columns[i])) {
+ return false;
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
+ }
+ } else if (!columns[i].equals(columns)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
new file mode 100644
index 0000000000..6592cadd52
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.crud;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+
+/** BatchInsert contains multiple sub insert. */
+public interface BatchInsert {
+
+ List<PartialPath> getDevicePaths();
+
+ List<String[]> getMeasurementsList();
+
+ List<TSDataType[]> getDataTypesList();
+
+ List<Boolean> getAlignedList();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
index ab54768a17..cd4e9ec890 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
@@ -19,13 +19,9 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import java.util.ArrayList;
-import java.util.List;
-
public abstract class InsertBaseStatement extends Statement {
/**
@@ -40,9 +36,6 @@ public abstract class InsertBaseStatement extends Statement {
// get from client
protected TSDataType[] dataTypes;
- // record the failed measurements, their reasons, and positions in "measurements"
- List<String> failedMeasurements;
-
public PartialPath getDevicePath() {
return devicePath;
}
@@ -74,28 +67,4 @@ public abstract class InsertBaseStatement extends Statement {
public void setAligned(boolean aligned) {
isAligned = aligned;
}
-
- public List<String> getFailedMeasurements() {
- return failedMeasurements;
- }
-
- public abstract boolean checkDataType(SchemaTree schemaTree);
-
- /**
- * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
- * measurements, the failed values or columns would be null as well. We'd better use
- * "measurements[index] == null" to determine if the measurement failed.
- *
- * @param index failed measurement index
- */
- public void markFailedMeasurementInsertion(int index, Exception e) {
- if (measurements[index] == null) {
- return;
- }
- if (failedMeasurements == null) {
- failedMeasurements = new ArrayList<>();
- }
- failedMeasurements.add(measurements[index]);
- measurements[index] = null;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
index 159fe87743..5b7aa98e9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -72,16 +71,6 @@ public class InsertMultiTabletsStatement extends InsertBaseStatement {
return alignedList;
}
- @Override
- public boolean checkDataType(SchemaTree schemaTree) {
- for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) {
- if (!insertTabletStatement.checkDataType(schemaTree)) {
- return false;
- }
- }
- return true;
- }
-
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertMultiTablets(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
index 6d2ea03035..72dfed7322 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
@@ -20,30 +20,18 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.constant.StatementType;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class InsertRowStatement extends InsertBaseStatement {
- private static final Logger logger = LoggerFactory.getLogger(InsertRowStatement.class);
private static final byte TYPE_RAW_STRING = -1;
private static final byte TYPE_NULL = -2;
@@ -117,92 +105,10 @@ public class InsertRowStatement extends InsertBaseStatement {
}
}
- /**
- * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
- * Double, Binary)
- */
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- public void transferType(SchemaTree schemaTree) throws QueryProcessException {
- List<MeasurementSchema> measurementSchemas =
- schemaTree
- .searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements))
- .getMeasurementSchemaList();
- if (isNeedInferType) {
- for (int i = 0; i < measurementSchemas.size(); i++) {
- if (measurementSchemas.get(i) == null) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- markFailedMeasurementInsertion(
- i,
- new QueryProcessException(
- new PathNotExistException(
- devicePath.getFullPath()
- + IoTDBConstant.PATH_SEPARATOR
- + measurements[i])));
- } else {
- throw new QueryProcessException(
- new PathNotExistException(
- devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
- }
- continue;
- }
-
- dataTypes[i] = measurementSchemas.get(i).getType();
- try {
- values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
- } catch (Exception e) {
- logger.warn(
- "{}.{} data type is not consistent, input {}, registered {}",
- devicePath,
- measurements[i],
- values[i],
- dataTypes[i]);
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- markFailedMeasurementInsertion(i, e);
- } else {
- throw e;
- }
- }
- }
- }
- }
-
- @Override
- public void markFailedMeasurementInsertion(int index, Exception e) {
- if (measurements[index] == null) {
- return;
- }
- super.markFailedMeasurementInsertion(index, e);
- values[index] = null;
- dataTypes[index] = null;
- }
-
public List<TTimePartitionSlot> getTimePartitionSlots() {
return Collections.singletonList(StorageEngineV2.getTimePartitionSlot(time));
}
- public boolean checkDataType(SchemaTree schemaTree) {
- List<MeasurementSchema> measurementSchemas =
- schemaTree
- .searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements))
- .getMeasurementSchemaList();
- for (int i = 0; i < measurementSchemas.size(); i++) {
- if (dataTypes[i] != measurementSchemas.get(i).getType()) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- return false;
- } else {
- markFailedMeasurementInsertion(
- i,
- new DataTypeMismatchException(
- devicePath.getFullPath(),
- measurements[i],
- measurementSchemas.get(i).getType(),
- dataTypes[i]));
- }
- }
- }
- return true;
- }
-
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertRow(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
index 438642ad43..71eccaa20d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -75,22 +73,6 @@ public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
return new ArrayList<>(timePartitionSlotSet);
}
- @Override
- public boolean checkDataType(SchemaTree schemaTree) {
- for (InsertRowStatement insertRowStatement : insertRowStatementList) {
- if (!insertRowStatement.checkDataType(schemaTree)) {
- return false;
- }
- }
- return true;
- }
-
- public void transferType(SchemaTree schemaTree) throws QueryProcessException {
- for (InsertRowStatement insertRowStatement : insertRowStatementList) {
- insertRowStatement.transferType(schemaTree);
- }
- }
-
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertRowsOfOneDevice(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
index f778d2d7d0..d740a2290f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -73,22 +71,6 @@ public class InsertRowsStatement extends InsertBaseStatement {
this.insertRowStatementList = insertRowStatementList;
}
- @Override
- public boolean checkDataType(SchemaTree schemaTree) {
- for (InsertRowStatement insertRowStatement : insertRowStatementList) {
- if (!insertRowStatement.checkDataType(schemaTree)) {
- return false;
- }
- }
- return true;
- }
-
- public void transferType(SchemaTree schemaTree) throws QueryProcessException {
- for (InsertRowStatement insertRowStatement : insertRowStatementList) {
- insertRowStatement.transferType(schemaTree);
- }
- }
-
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertRows(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 1d08cba075..612d2f8463 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -19,16 +19,12 @@
package org.apache.iotdb.db.mpp.sql.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
public class InsertTabletStatement extends InsertBaseStatement {
@@ -71,31 +67,20 @@ public class InsertTabletStatement extends InsertBaseStatement {
this.times = times;
}
- @Override
- public void markFailedMeasurementInsertion(int index, Exception e) {
- if (measurements[index] == null) {
- return;
- }
- super.markFailedMeasurementInsertion(index, e);
- dataTypes[index] = null;
- columns[index] = null;
- bitMaps[index] = null;
- }
-
public List<TTimePartitionSlot> getTimePartitionSlots() {
List<TTimePartitionSlot> result = new ArrayList<>();
long startTime =
- (times[0] / StorageEngineV2.getTimePartitionInterval())
- * StorageEngineV2.getTimePartitionInterval(); // included
- long endTime = startTime + StorageEngineV2.getTimePartitionInterval(); // excluded
+ (times[0] / StorageEngine.getTimePartitionInterval())
+ * StorageEngine.getTimePartitionInterval(); // included
+ long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
TTimePartitionSlot timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[0]);
for (int i = 1; i < times.length; i++) { // times are sorted in session API.
if (times[i] >= endTime) {
result.add(timePartitionSlot);
// next init
endTime =
- (times[i] / StorageEngineV2.getTimePartitionInterval() + 1)
- * StorageEngineV2.getTimePartitionInterval();
+ (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+ * StorageEngine.getTimePartitionInterval();
timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[i]);
}
}
@@ -103,30 +88,6 @@ public class InsertTabletStatement extends InsertBaseStatement {
return result;
}
- @Override
- public boolean checkDataType(SchemaTree schemaTree) {
- List<MeasurementSchema> measurementSchemas =
- schemaTree
- .searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements))
- .getMeasurementSchemaList();
- for (int i = 0; i < measurementSchemas.size(); i++) {
- if (dataTypes[i] != measurementSchemas.get(i).getType()) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- return false;
- } else {
- markFailedMeasurementInsertion(
- i,
- new DataTypeMismatchException(
- devicePath.getFullPath(),
- measurements[i],
- measurementSchemas.get(i).getType(),
- dataTypes[i]));
- }
- }
- }
- return true;
- }
-
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertTablet(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index eb77ccb6cd..493b8466a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -37,12 +37,18 @@ import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
@@ -93,9 +99,22 @@ public class InternalServiceImpl implements InternalService.Iface {
return new TSendFragmentInstanceResp(!info.getState().isFailed());
case WRITE:
TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
- ConsensusWriteResponse resp =
- ConsensusImpl.getInstance()
- .write(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+ ConsensusWriteResponse resp;
+
+ FragmentInstance fragmentInstance =
+ FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+ PlanNode planNode = fragmentInstance.getFragment().getRoot();
+ if (planNode instanceof InsertNode) {
+ try {
+ SchemaTree schemaTree = SchemaValidator.validate((InsertNode) planNode);
+ ((InsertNode) planNode).setMeasurementSchemas(schemaTree);
+ } catch (SemanticException e) {
+ response.setAccepted(false);
+ response.setMessage(e.getMessage());
+ return response;
+ }
+ }
+ resp = ConsensusImpl.getInstance().write(groupId, fragmentInstance);
// TODO need consider more status
response.setAccepted(
TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index a49e05acbd..1f043fb1bf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -117,14 +117,18 @@ public class DataRegionTest {
values[i] = record.dataPointList.get(i).getValue();
}
QueryId queryId = new QueryId("test_write");
- return new InsertRowNode(
- queryId.genPlanNodeId(),
- new PartialPath(record.deviceId),
- false,
- measurementSchemas,
- dataTypes,
- record.time,
- values);
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath(record.deviceId),
+ false,
+ measurements,
+ dataTypes,
+ record.time,
+ values,
+ false);
+ insertRowNode.setMeasurementSchemas(measurementSchemas);
+ return insertRowNode;
}
@Test
@@ -276,12 +280,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -297,12 +302,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -444,12 +450,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -464,12 +471,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -534,12 +542,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -554,12 +563,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -624,12 +634,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -644,12 +655,13 @@ public class DataRegionTest {
new QueryId("test_write").genPlanNodeId(),
new PartialPath("root.vehicle.d0"),
false,
- measurementSchemas,
+ measurements,
dataTypes,
times,
null,
columns,
times.length);
+ insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index f97bf51030..999fb8990f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -463,15 +463,20 @@ public class TsFileProcessorV2Test {
((long[]) columns[i])[(int) r] = r;
}
}
- return new InsertTabletNode(
- new QueryId("test_write").genPlanNodeId(),
- new PartialPath(deviceId),
- isAligned,
- schemas,
- dataTypes,
- times,
- null,
- columns,
- times.length);
+
+ InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath(deviceId),
+ isAligned,
+ measurements,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+ insertTabletNode.setMeasurementSchemas(schemas);
+
+ return insertTabletNode;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 63e0e59836..0eef8c3f02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -313,12 +313,17 @@ public class DistributionPlannerTest {
queryId.genPlanNodeId(),
new PartialPath("root.sg.d1"),
false,
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
+ new String[] {
+ "s1",
},
new TSDataType[] {TSDataType.INT32},
1L,
- new Object[] {10});
+ new Object[] {10},
+ false);
+ insertRowNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ });
Analysis analysis = constructAnalysis();
@@ -340,24 +345,30 @@ public class DistributionPlannerTest {
queryId.genPlanNodeId(),
new PartialPath("root.sg.d1"),
false,
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
- },
+ new String[] {"s1"},
new TSDataType[] {TSDataType.INT32},
1L,
- new Object[] {10});
+ new Object[] {10},
+ false);
+ insertRowNode1.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ });
InsertRowNode insertRowNode2 =
new InsertRowNode(
queryId.genPlanNodeId(),
new PartialPath("root.sg.d1"),
false,
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
- },
+ new String[] {"s1"},
new TSDataType[] {TSDataType.INT32},
100000L,
- new Object[] {10});
+ new Object[] {10},
+ false);
+ insertRowNode2.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ });
InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertMultiTabletsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
new file mode 100644
index 0000000000..a34b0eb68e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class InsertMultiTabletsNodeSerdeTest {
+
+ @Test
+ public void testInsertMultiTabletPlan()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ }
+
+ PlanNodeId planNodeId = new PlanNodeId("plan node 1");
+
+ InsertMultiTabletsNode insertMultiTabletsNode = new InsertMultiTabletsNode(planNodeId);
+
+ for (int i = 0; i < 10; i++) {
+ insertMultiTabletsNode.addInsertTabletNode(
+ new InsertTabletNode(
+ planNodeId,
+ new PartialPath("root.multi.d" + i),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length),
+ i);
+ }
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ insertMultiTabletsNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_MULTI_TABLET.ordinal(), byteBuffer.getShort());
+
+ Assert.assertEquals(InsertMultiTabletsNode.deserialize(byteBuffer), insertMultiTabletsNode);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
index 03cc69769c..229fbd0722 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
@@ -24,12 +24,16 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public class InsertRowNodeSerdeTest {
@@ -46,18 +50,50 @@ public class InsertRowNodeSerdeTest {
Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
- // Test with failed column
- insertRowNode = getInsertRowNodeWithFailedColumn();
+ insertRowNode = getInsertRowNodeWithMeasurementSchemas();
byteBuffer = ByteBuffer.allocate(10000);
insertRowNode.serialize(byteBuffer);
byteBuffer.flip();
Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
- InsertRowNode tmpNode = InsertRowNode.deserialize(byteBuffer);
+ Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+
+ insertRowNode = getInsertRowNodeWithStringValue();
+ byteBuffer = ByteBuffer.allocate(10000);
+ insertRowNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
+
+ Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+ }
+
+ @Test
+ public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
+ InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+
+ int serializedSize = insertRowNode.serializedSize();
+
+ Assert.assertEquals(serializedSize, 125);
+
+ byte[] bytes = new byte[serializedSize];
+ WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
+
+ insertRowNode.serializeToWAL(walBuffer);
+
+ DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
+
+ Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), dataInputStream.readShort());
+
+ InsertRowNode tmpNode = InsertRowNode.deserialize(dataInputStream);
Assert.assertEquals(tmpNode.getTime(), insertRowNode.getTime());
- Assert.assertEquals(tmpNode.getMeasurements(), new String[] {"s1", "s3", "s5"});
+ Assert.assertEquals(tmpNode.getDevicePath(), insertRowNode.getDevicePath());
+ Assert.assertEquals(tmpNode.isAligned(), insertRowNode.isAligned());
+ Assert.assertArrayEquals(tmpNode.getValues(), insertRowNode.getValues());
+ Assert.assertArrayEquals(
+ tmpNode.getMeasurementSchemas(), insertRowNode.getMeasurementSchemas());
}
private InsertRowNode getInsertRowNode() throws IllegalPathException {
@@ -82,45 +118,83 @@ public class InsertRowNodeSerdeTest {
new PlanNodeId("plannode 1"),
new PartialPath("root.isp.d1"),
false,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ dataTypes,
+ time,
+ columns,
+ false);
+ }
+
+ private InsertRowNode getInsertRowNodeWithMeasurementSchemas() throws IllegalPathException {
+ long time = 80L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ };
+
+ Object[] columns = new Object[5];
+ columns[0] = 5.0;
+ columns[1] = 6.0f;
+ columns[2] = 1000l;
+ columns[3] = 10;
+ columns[4] = true;
+
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ new PlanNodeId("plannode 2"),
+ new PartialPath("root.isp.d2"),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ dataTypes,
+ time,
+ columns,
+ false);
+
+ insertRowNode.setMeasurementSchemas(
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.DOUBLE),
new MeasurementSchema("s2", TSDataType.FLOAT),
new MeasurementSchema("s3", TSDataType.INT64),
new MeasurementSchema("s4", TSDataType.INT32),
new MeasurementSchema("s5", TSDataType.BOOLEAN)
- },
- dataTypes,
- time,
- columns);
+ });
+
+ return insertRowNode;
}
- private InsertRowNode getInsertRowNodeWithFailedColumn() throws IllegalPathException {
+ private InsertRowNode getInsertRowNodeWithStringValue() throws IllegalPathException {
long time = 110L;
TSDataType[] dataTypes =
new TSDataType[] {
- TSDataType.DOUBLE, null, TSDataType.INT64, null, TSDataType.BOOLEAN,
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
};
Object[] columns = new Object[5];
- columns[0] = 1.0;
- columns[1] = null;
- columns[2] = 10000l;
- columns[3] = null;
- columns[4] = false;
-
- return new InsertRowNode(
- new PlanNodeId("plannode 1"),
- new PartialPath("root.isp.d1"),
- false,
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.DOUBLE),
- null,
- new MeasurementSchema("s3", TSDataType.INT64),
- null,
- new MeasurementSchema("s5", TSDataType.BOOLEAN)
- },
- dataTypes,
- time,
- columns);
+ columns[0] = "1.0";
+ columns[1] = "2.0";
+ columns[2] = "10000";
+ columns[3] = "100";
+ columns[4] = "false";
+
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath("root.isp.d1"),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ new TSDataType[5],
+ time,
+ columns,
+ false);
+ insertRowNode.setNeedInferType(true);
+ return insertRowNode;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsNodeSerdeTest.java
new file mode 100644
index 0000000000..99f949e532
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsNodeSerdeTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertRowsNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ InsertRowsNode node = new InsertRowsNode(new PlanNodeId("plan node 1"));
+ node.addOneInsertRowNode(
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+ 1000L,
+ new Object[] {1.0, 2f, 300L},
+ false),
+ 0);
+
+ node.addOneInsertRowNode(
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ new PartialPath("root.sg.d2"),
+ false,
+ new String[] {"s1", "s4"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+ 2000L,
+ new Object[] {2.0, false},
+ false),
+ 1);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ node.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_ROWS.ordinal(), byteBuffer.getShort());
+
+ Assert.assertEquals(InsertRowsNode.deserialize(byteBuffer), node);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
new file mode 100644
index 0000000000..a93cf246a0
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertRowsOfOneDeviceNodeSerdeTest {
+
+ @Test
+ public void TestSerializeAndDeserialize() throws IllegalPathException {
+ PartialPath device = new PartialPath("root.sg.d");
+ InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new PlanNodeId("plan node 1"));
+ node.setDevicePath(device);
+ node.addOneInsertRowNode(
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ device,
+ false,
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+ 1000L,
+ new Object[] {1.0, 2f, 300L},
+ false),
+ 0);
+
+ node.addOneInsertRowNode(
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ device,
+ false,
+ new String[] {"s1", "s4"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+ 2000L,
+ new Object[] {2.0, false},
+ false),
+ 1);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ node.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.ordinal(), byteBuffer.getShort());
+
+ Assert.assertEquals(InsertRowsOfOneDeviceNode.deserialize(byteBuffer), node);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
index 46abaec468..390ca019bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -24,12 +24,16 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public class InsertTabletNodeSerdeTest {
@@ -45,6 +49,43 @@ public class InsertTabletNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+
+ insertTabletNode = getInsertTabletNodeWithSchema();
+ byteBuffer = ByteBuffer.allocate(10000);
+ insertTabletNode.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
+
+ Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+ }
+
+ @Test
+ public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
+ InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+
+ int serializedSize = insertTabletNode.serializedSize();
+
+ Assert.assertEquals(229, serializedSize);
+
+ byte[] bytes = new byte[serializedSize];
+ WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
+
+ insertTabletNode.serializeToWAL(walBuffer);
+
+ DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
+
+ Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), dataInputStream.readShort());
+
+ InsertTabletNode tmpNode = InsertTabletNode.deserialize(dataInputStream);
+
+ Assert.assertArrayEquals(tmpNode.getTimes(), insertTabletNode.getTimes());
+ Assert.assertEquals(tmpNode.getDevicePath(), insertTabletNode.getDevicePath());
+ Assert.assertEquals(tmpNode.isAligned(), insertTabletNode.isAligned());
+ Assert.assertArrayEquals(tmpNode.getColumns(), insertTabletNode.getColumns());
+ Assert.assertArrayEquals(tmpNode.getBitMaps(), insertTabletNode.getBitMaps());
+ Assert.assertArrayEquals(
+ tmpNode.getMeasurementSchemas(), insertTabletNode.getMeasurementSchemas());
}
private InsertTabletNode getInsertTabletNode() throws IllegalPathException {
@@ -76,13 +117,7 @@ public class InsertTabletNodeSerdeTest {
new PlanNodeId("plannode 1"),
new PartialPath("root.isp.d1"),
false,
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.DOUBLE),
- new MeasurementSchema("s2", TSDataType.FLOAT),
- new MeasurementSchema("s3", TSDataType.INT64),
- new MeasurementSchema("s4", TSDataType.INT32),
- new MeasurementSchema("s5", TSDataType.BOOLEAN)
- },
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
dataTypes,
times,
null,
@@ -91,4 +126,51 @@ public class InsertTabletNodeSerdeTest {
return tabletNode;
}
+
+ private InsertTabletNode getInsertTabletNodeWithSchema() throws IllegalPathException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ TSDataType[] dataTypes = new TSDataType[5];
+ dataTypes[0] = TSDataType.DOUBLE;
+ dataTypes[1] = TSDataType.FLOAT;
+ dataTypes[2] = TSDataType.INT64;
+ dataTypes[3] = TSDataType.INT32;
+ dataTypes[4] = TSDataType.BOOLEAN;
+
+ Object[] columns = new Object[5];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ }
+
+ InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath("root.isp.d1"),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+ insertTabletNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN)
+ });
+
+ return insertTabletNode;
+ }
}