You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/18 02:48:08 UTC
[iotdb] branch master updated: Support logic view - move schema validation to analyze phase (#9767)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ef10a20a3e Support logic view - move schema validation to analyze phase (#9767)
ef10a20a3e is described below
commit ef10a20a3e5df126674787fd08208cb1ff06c5a2
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu May 18 10:48:02 2023 +0800
Support logic view - move schema validation to analyze phase (#9767)
---
.../execution/executor/RegionWriteExecutor.java | 40 +----
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 16 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 92 +++++++++--
.../mpp/plan/analyze/schema/SchemaValidator.java | 27 +--
.../db/mpp/plan/execution/QueryExecution.java | 10 +-
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 70 +++++---
.../planner/plan/node/write/BatchInsertNode.java | 33 ----
.../plan/node/write/InsertMultiTabletsNode.java | 22 +--
.../plan/planner/plan/node/write/InsertNode.java | 126 +-------------
.../planner/plan/node/write/InsertRowNode.java | 171 +++----------------
.../planner/plan/node/write/InsertRowsNode.java | 33 +---
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 32 +---
.../planner/plan/node/write/InsertTabletNode.java | 145 +++-------------
.../scheduler/FragmentInstanceDispatcherImpl.java | 5 +-
.../plan/statement/crud/InsertBaseStatement.java | 157 +++++++++++++++++-
.../crud/InsertMultiTabletsStatement.java | 30 ++++
.../plan/statement/crud/InsertRowStatement.java | 183 ++++++++++++++++++++-
.../crud/InsertRowsOfOneDeviceStatement.java | 42 +++++
.../plan/statement/crud/InsertRowsStatement.java | 41 +++++
.../plan/statement/crud/InsertTabletStatement.java | 155 ++++++++++++++++-
.../db/wal/recover/file/TsFilePlanRedoer.java | 4 -
.../db/engine/storagegroup/DataRegionTest.java | 16 +-
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 7 +-
.../org/apache/iotdb/db/wal/io/WALFileTest.java | 25 ++-
.../iotdb/db/wal/node/ConsensusReqReaderTest.java | 27 ++-
.../org/apache/iotdb/db/wal/node/WALNodeTest.java | 26 ++-
.../db/wal/recover/file/TsFilePlanRedoerTest.java | 32 ++--
.../file/UnsealedTsFileRecoverPerformerTest.java | 5 +-
28 files changed, 883 insertions(+), 689 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index ab6f614cf5..059cfbc65b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
@@ -37,12 +36,10 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
@@ -226,36 +223,8 @@ public class RegionWriteExecutor {
private RegionExecutionResult executeDataInsert(
InsertNode insertNode, WritePlanNodeExecutionContext context) {
RegionExecutionResult response = new RegionExecutionResult();
- // data insertion should be blocked by data deletion, especially when deleting timeseries
- final long startTime = System.nanoTime();
context.getRegionWriteValidationRWLock().readLock().lock();
try {
- try {
- SchemaValidator.validate(insertNode);
- } catch (SemanticException e) {
- response.setAccepted(false);
- response.setMessage(e.getMessage());
- if (e.getCause() instanceof IoTDBException) {
- IoTDBException ioTDBException = (IoTDBException) e.getCause();
- response.setStatus(
- RpcUtils.getStatus(ioTDBException.getErrorCode(), ioTDBException.getMessage()));
- } else {
- response.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
- }
- return response;
- } finally {
- PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(
- System.nanoTime() - startTime);
- }
- boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
- String partialInsertMessage = null;
- if (hasFailedMeasurement) {
- partialInsertMessage =
- String.format(
- "Fail to insert measurements %s caused by %s",
- insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
- LOGGER.warn(partialInsertMessage);
- }
ConsensusWriteResponse writeResponse =
fireTriggerAndInsert(context.getRegionId(), insertNode);
@@ -263,17 +232,10 @@ public class RegionWriteExecutor {
// TODO need consider more status
if (writeResponse.getStatus() != null) {
response.setAccepted(
- !hasFailedMeasurement
- && TSStatusCode.SUCCESS_STATUS.getStatusCode()
- == writeResponse.getStatus().getCode());
+ TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != writeResponse.getStatus().getCode()) {
response.setMessage(writeResponse.getStatus().message);
response.setStatus(writeResponse.getStatus());
- } else if (hasFailedMeasurement) {
- response.setMessage(partialInsertMessage);
- response.setStatus(
- RpcUtils.getStatus(
- TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
} else {
response.setMessage(writeResponse.getStatus().message);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 6b3f4780a6..3ff74d6398 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
@@ -80,9 +81,10 @@ public class Analysis {
private boolean finishQueryAfterAnalyze;
- // potential fail message when finishQueryAfterAnalyze is true. If failMessage is NULL, means no
+ // potential fail status when finishQueryAfterAnalyze is true. If failStatus is NULL, means no
// fail.
- private String failMessage;
+
+ private TSStatus failStatus;
/////////////////////////////////////////////////////////////////////////////////////////////////
// Query Analysis (used in ALIGN BY TIME)
@@ -407,15 +409,15 @@ public class Analysis {
}
public boolean isFailed() {
- return failMessage != null;
+ return failStatus != null;
}
- public String getFailMessage() {
- return failMessage;
+ public TSStatus getFailStatus() {
+ return this.failStatus;
}
- public void setFailMessage(String failMessage) {
- this.failMessage = failMessage;
+ public void setFailStatus(TSStatus status) {
+ this.failStatus = status;
}
public void setDeviceViewInputIndexesMap(Map<String, List<Integer>> deviceViewInputIndexesMap) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index a5c7bafe36..7313a600df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -31,6 +32,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeClientManager;
@@ -92,6 +94,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -137,6 +140,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -206,6 +210,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
+ private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
+ PerformanceOverviewMetrics.getInstance();
+
public AnalyzeVisitor(IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
@@ -2031,32 +2038,48 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(insertTabletStatement);
+ validateSchema(analysis, insertTabletStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(insertTabletStatement.getTimePartitionSlots());
- return getAnalysisForWriting(
- insertTabletStatement, Collections.singletonList(dataPartitionQueryParam));
+ return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
}
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(insertRowStatement);
+ validateSchema(analysis, insertRowStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
dataPartitionQueryParam.setTimePartitionSlotList(
Collections.singletonList(insertRowStatement.getTimePartitionSlot()));
- return getAnalysisForWriting(
- insertRowStatement, Collections.singletonList(dataPartitionQueryParam));
+ return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
}
@Override
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(insertRowsStatement);
+ validateSchema(analysis, insertRowsStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
@@ -2074,13 +2097,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
- return getAnalysisForWriting(insertRowsStatement, dataPartitionQueryParams);
+ return getAnalysisForWriting(analysis, dataPartitionQueryParams);
}
@Override
public Analysis visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(insertMultiTabletsStatement);
+ validateSchema(analysis, insertMultiTabletsStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
for (InsertTabletStatement insertTabletStatement :
@@ -2099,13 +2128,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
- return getAnalysisForWriting(insertMultiTabletsStatement, dataPartitionQueryParams);
+ return getAnalysisForWriting(analysis, dataPartitionQueryParams);
}
@Override
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(insertRowsOfOneDeviceStatement);
+ validateSchema(analysis, insertRowsOfOneDeviceStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(
@@ -2113,8 +2148,37 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
dataPartitionQueryParam.setTimePartitionSlotList(
insertRowsOfOneDeviceStatement.getTimePartitionSlots());
- return getAnalysisForWriting(
- insertRowsOfOneDeviceStatement, Collections.singletonList(dataPartitionQueryParam));
+ return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
+ }
+
+ private void validateSchema(Analysis analysis, InsertBaseStatement insertStatement) {
+ final long startTime = System.nanoTime();
+ try {
+ SchemaValidator.validate(schemaFetcher, insertStatement);
+ } catch (SemanticException e) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ if (e.getCause() instanceof IoTDBException) {
+ IoTDBException exception = (IoTDBException) e.getCause();
+ analysis.setFailStatus(
+ RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage()));
+ } else {
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
+ }
+ return;
+ } finally {
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime);
+ }
+ boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
+ String partialInsertMessage;
+ if (hasFailedMeasurement) {
+ partialInsertMessage =
+ String.format(
+ "Fail to insert measurements %s caused by %s",
+ insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages());
+ logger.warn(partialInsertMessage);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
+ }
}
@Override
@@ -2199,16 +2263,17 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
/** get analysis according to statement and params */
private Analysis getAnalysisForWriting(
- Statement statement, List<DataPartitionQueryParam> dataPartitionQueryParams) {
- Analysis analysis = new Analysis();
- analysis.setStatement(statement);
+ Analysis analysis, List<DataPartitionQueryParam> dataPartitionQueryParams) {
DataPartition dataPartition =
partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
if (dataPartition.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailMessage(
- "Database not exists and failed to create automatically because enable_auto_create_schema is FALSE.");
+ analysis.setFailStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
+ "Database not exists and failed to create automatically "
+ + "because enable_auto_create_schema is FALSE."));
}
analysis.setDataPartitionInfo(dataPartition);
return analysis;
@@ -2359,6 +2424,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
return SchemaValidator.validate(
+ schemaFetcher,
deviceList,
measurementList,
dataTypeList,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 6cbd587a00..5005b9f7a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -33,30 +35,31 @@ import java.util.List;
public class SchemaValidator {
- private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
-
- public static void validate(InsertNode insertNode) {
+ public static void validate(ISchemaFetcher schemaFetcher, InsertBaseStatement insertStatement) {
try {
- if (insertNode instanceof BatchInsertNode) {
- SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
- ((BatchInsertNode) insertNode).getSchemaValidationList());
+ if (insertStatement instanceof InsertRowsStatement
+ || insertStatement instanceof InsertMultiTabletsStatement
+ || insertStatement instanceof InsertRowsOfOneDeviceStatement) {
+ schemaFetcher.fetchAndComputeSchemaWithAutoCreate(
+ insertStatement.getSchemaValidationList());
} else {
- SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertNode.getSchemaValidation());
+ schemaFetcher.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidation());
}
- insertNode.updateAfterSchemaValidation();
+ insertStatement.updateAfterSchemaValidation();
} catch (QueryProcessException e) {
- throw new SemanticException(e);
+ throw new SemanticException(e.getMessage());
}
}
public static ISchemaTree validate(
+ ISchemaFetcher schemaFetcher,
List<PartialPath> devicePaths,
List<String[]> measurements,
List<TSDataType[]> dataTypes,
List<TSEncoding[]> encodings,
List<CompressionType[]> compressionTypes,
List<Boolean> isAlignedList) {
- return SCHEMA_FETCHER.fetchSchemaListWithAutoCreate(
+ return schemaFetcher.fetchSchemaListWithAutoCreate(
devicePaths, measurements, dataTypes, encodings, compressionTypes, isAlignedList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 715cf29c71..e6bcdeb728 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -202,7 +202,7 @@ public class QueryExecution implements IQueryExecution {
if (skipExecute()) {
logger.debug("[SkipExecute]");
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
- stateMachine.transitionToFailed(new RuntimeException(analysis.getFailMessage()));
+ stateMachine.transitionToFailed(analysis.getFailStatus());
} else {
constructResultForMemorySource();
stateMachine.transitionToRunning();
@@ -224,6 +224,14 @@ public class QueryExecution implements IQueryExecution {
}
PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime);
schedule();
+
+ // set partial insert error message
+ // When some columns in one insert failed, other column will continue executing insertion.
+ // The error message should be return to client, therefore we need to set it after the insertion
+ // of other column finished.
+ if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
+ stateMachine.transitionToFailed(analysis.getFailStatus());
+ }
}
private void checkTimeOutForQuery() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 1d02ae17c5..ed82734b76 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -445,30 +445,38 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
// convert insert statement to insert node
- return new InsertTabletNode(
- context.getQueryId().genPlanNodeId(),
- insertTabletStatement.getDevicePath(),
- insertTabletStatement.isAligned(),
- insertTabletStatement.getMeasurements(),
- insertTabletStatement.getDataTypes(),
- insertTabletStatement.getTimes(),
- insertTabletStatement.getBitMaps(),
- insertTabletStatement.getColumns(),
- insertTabletStatement.getRowCount());
+ InsertTabletNode insertNode =
+ new InsertTabletNode(
+ context.getQueryId().genPlanNodeId(),
+ insertTabletStatement.getDevicePath(),
+ insertTabletStatement.isAligned(),
+ insertTabletStatement.getMeasurements(),
+ insertTabletStatement.getDataTypes(),
+ insertTabletStatement.getMeasurementSchemas(),
+ insertTabletStatement.getTimes(),
+ insertTabletStatement.getBitMaps(),
+ insertTabletStatement.getColumns(),
+ insertTabletStatement.getRowCount());
+ insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
+ return insertNode;
}
@Override
public PlanNode visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
// convert insert statement to insert node
- return new InsertRowNode(
- context.getQueryId().genPlanNodeId(),
- insertRowStatement.getDevicePath(),
- insertRowStatement.isAligned(),
- insertRowStatement.getMeasurements(),
- insertRowStatement.getDataTypes(),
- insertRowStatement.getTime(),
- insertRowStatement.getValues(),
- insertRowStatement.isNeedInferType());
+ InsertRowNode insertNode =
+ new InsertRowNode(
+ context.getQueryId().genPlanNodeId(),
+ insertRowStatement.getDevicePath(),
+ insertRowStatement.isAligned(),
+ insertRowStatement.getMeasurements(),
+ insertRowStatement.getDataTypes(),
+ insertRowStatement.getMeasurementSchemas(),
+ insertRowStatement.getTime(),
+ insertRowStatement.getValues(),
+ insertRowStatement.isNeedInferType());
+ insertNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+ return insertNode;
}
@Override
@@ -629,17 +637,19 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
InsertRowStatement insertRowStatement =
insertRowsStatement.getInsertRowStatementList().get(i);
- insertRowsNode.addOneInsertRowNode(
+ InsertRowNode insertRowNode =
new InsertRowNode(
insertRowsNode.getPlanNodeId(),
insertRowStatement.getDevicePath(),
insertRowStatement.isAligned(),
insertRowStatement.getMeasurements(),
insertRowStatement.getDataTypes(),
+ insertRowStatement.getMeasurementSchemas(),
insertRowStatement.getTime(),
insertRowStatement.getValues(),
- insertRowStatement.isNeedInferType()),
- i);
+ insertRowStatement.isNeedInferType());
+ insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+ insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
return insertRowsNode;
}
@@ -653,18 +663,21 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
for (int i = 0; i < insertMultiTabletsStatement.getInsertTabletStatementList().size(); i++) {
InsertTabletStatement insertTabletStatement =
insertMultiTabletsStatement.getInsertTabletStatementList().get(i);
- insertMultiTabletsNode.addInsertTabletNode(
+ InsertTabletNode insertTabletNode =
new InsertTabletNode(
insertMultiTabletsNode.getPlanNodeId(),
insertTabletStatement.getDevicePath(),
insertTabletStatement.isAligned(),
insertTabletStatement.getMeasurements(),
insertTabletStatement.getDataTypes(),
+ insertTabletStatement.getMeasurementSchemas(),
insertTabletStatement.getTimes(),
insertTabletStatement.getBitMaps(),
insertTabletStatement.getColumns(),
- insertTabletStatement.getRowCount()),
- i);
+ insertTabletStatement.getRowCount());
+ insertTabletNode.setFailedMeasurementNumber(
+ insertTabletStatement.getFailedMeasurementNumber());
+ insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, i);
}
return insertMultiTabletsNode;
}
@@ -681,16 +694,19 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
for (int i = 0; i < insertRowsOfOneDeviceStatement.getInsertRowStatementList().size(); i++) {
InsertRowStatement insertRowStatement =
insertRowsOfOneDeviceStatement.getInsertRowStatementList().get(i);
- insertRowNodeList.add(
+ InsertRowNode insertRowNode =
new InsertRowNode(
insertRowsOfOneDeviceNode.getPlanNodeId(),
insertRowStatement.getDevicePath(),
insertRowStatement.isAligned(),
insertRowStatement.getMeasurements(),
insertRowStatement.getDataTypes(),
+ insertRowStatement.getMeasurementSchemas(),
insertRowStatement.getTime(),
insertRowStatement.getValues(),
- insertRowStatement.isNeedInferType()));
+ insertRowStatement.isNeedInferType());
+ insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+ insertRowNodeList.add(insertRowNode);
insertRowNodeIndexList.add(i);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/BatchInsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/BatchInsertNode.java
deleted file mode 100644
index a0c774d5c4..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/BatchInsertNode.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
-
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
-
-import java.util.List;
-
-/**
- * BatchInsertNode contains multiple sub insert. Insert node which contains multiple sub insert
- * nodes needs to implement it.
- */
-public interface BatchInsertNode {
-
- List<ISchemaValidation> getSchemaValidationList();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index d0b46b103a..5a4e198b77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -22,14 +22,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -40,9 +38,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.stream.Collectors;
-public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNode {
+public class InsertMultiTabletsNode extends InsertNode {
/**
* the value is used to indict the parent InsertTabletNode's index when the parent
@@ -135,11 +132,6 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
}
- @Override
- protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
- return false;
- }
-
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<TRegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
@@ -193,13 +185,6 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
return null;
}
- @Override
- public List<ISchemaValidation> getSchemaValidationList() {
- return insertTabletNodeList.stream()
- .map(InsertTabletNode::getSchemaValidation)
- .collect(Collectors.toList());
- }
-
public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId;
List<InsertTabletNode> insertTabletNodeList = new ArrayList<>();
@@ -278,9 +263,4 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
public long getMinTime() {
throw new NotImplementedException();
}
-
- @Override
- public Object getFirstValueOfIndex(int index) {
- throw new NotImplementedException();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 07c4b86c06..c191f03837 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,13 +20,10 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -41,11 +38,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.stream.Collectors;
public abstract class InsertNode extends WritePlanNode {
@@ -62,11 +55,8 @@ public abstract class InsertNode extends WritePlanNode {
protected MeasurementSchema[] measurementSchemas;
protected String[] measurements;
protected TSDataType[] dataTypes;
- // TODO(INSERT) need to change it to a function handle to update last time value
- // protected IMeasurementMNode[] measurementMNodes;
- /** index of failed measurements -> info including measurement, data type and value */
- protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
+ protected int failedMeasurementNumber = 0;
/**
* device id reference, for reuse device id in both id table and memtable <br>
@@ -242,56 +232,8 @@ public abstract class InsertNode extends WritePlanNode {
return dataRegionReplicaSet;
}
- public ISchemaValidation getSchemaValidation() {
- throw new UnsupportedOperationException();
- }
-
- public void updateAfterSchemaValidation() throws QueryProcessException {}
-
- /** Check whether data types are matched with measurement schemas */
- protected void selfCheckDataTypes(int index)
- throws DataTypeMismatchException, PathNotExistException {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- // if enable partial insert, mark failed measurements with exception
- if (measurementSchemas[index] == null) {
- markFailedMeasurement(
- index,
- new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath()));
- } else if ((dataTypes[index] != measurementSchemas[index].getType()
- && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
- markFailedMeasurement(
- index,
- new DataTypeMismatchException(
- devicePath.getFullPath(),
- measurements[index],
- dataTypes[index],
- measurementSchemas[index].getType(),
- getMinTime(),
- getFirstValueOfIndex(index)));
- }
- } else {
- // if not enable partial insert, throw the exception directly
- if (measurementSchemas[index] == null) {
- throw new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath());
- } else if ((dataTypes[index] != measurementSchemas[index].getType()
- && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
- throw new DataTypeMismatchException(
- devicePath.getFullPath(),
- measurements[index],
- dataTypes[index],
- measurementSchemas[index].getType(),
- getMinTime(),
- getFirstValueOfIndex(index));
- }
- }
- }
-
- protected abstract boolean checkAndCastDataType(int columnIndex, TSDataType dataType);
-
public abstract long getMinTime();
- public abstract Object getFirstValueOfIndex(int index);
-
/**
* Notice: Call this method ONLY when using IOT_CONSENSUS, other consensus protocol cannot
* distinguish whether the insertNode sync from leader by this method.
@@ -302,16 +244,8 @@ public abstract class InsertNode extends WritePlanNode {
}
// region partial insert
- /**
- * Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
- * be null. We'd better use "measurements[index] == null" to determine if the measurement failed.
- * <br>
- * This method is not concurrency-safe.
- *
- * @param index failed measurement index
- * @param cause cause Exception of failure
- */
- public void markFailedMeasurement(int index, Exception cause) {
+ @TestOnly
+ public void markFailedMeasurement(int index) {
throw new UnsupportedOperationException();
}
@@ -324,58 +258,12 @@ public abstract class InsertNode extends WritePlanNode {
return false;
}
- public boolean hasFailedMeasurements() {
- return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty();
+ public void setFailedMeasurementNumber(int failedMeasurementNumber) {
+ this.failedMeasurementNumber = failedMeasurementNumber;
}
public int getFailedMeasurementNumber() {
- return failedMeasurementIndex2Info == null ? 0 : failedMeasurementIndex2Info.size();
- }
-
- public List<String> getFailedMeasurements() {
- return failedMeasurementIndex2Info == null
- ? Collections.emptyList()
- : failedMeasurementIndex2Info.values().stream()
- .map(info -> info.measurement)
- .collect(Collectors.toList());
- }
-
- public List<Exception> getFailedExceptions() {
- return failedMeasurementIndex2Info == null
- ? Collections.emptyList()
- : failedMeasurementIndex2Info.values().stream()
- .map(info -> info.cause)
- .collect(Collectors.toList());
- }
-
- public List<String> getFailedMessages() {
- return failedMeasurementIndex2Info == null
- ? Collections.emptyList()
- : failedMeasurementIndex2Info.values().stream()
- .map(
- info -> {
- Throwable cause = info.cause;
- while (cause.getCause() != null) {
- cause = cause.getCause();
- }
- return cause.getMessage();
- })
- .collect(Collectors.toList());
- }
-
- protected static class FailedMeasurementInfo {
- protected String measurement;
- protected TSDataType dataType;
- protected Object value;
- protected Exception cause;
-
- public FailedMeasurementInfo(
- String measurement, TSDataType dataType, Object value, Exception cause) {
- this.measurement = measurement;
- this.dataType = dataType;
- this.value = value;
- this.cause = cause;
- }
+ return failedMeasurementNumber;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index e6f5a4099a..d7839f0997 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -19,56 +19,38 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Objects;
-public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaValidation {
-
- private static final Logger logger = LoggerFactory.getLogger(InsertRowNode.class);
+public class InsertRowNode extends InsertNode implements WALEntryValue {
private static final byte TYPE_RAW_STRING = -1;
@@ -83,16 +65,34 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
super(id);
}
+ @TestOnly
+ public InsertRowNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ long time,
+ Object[] values,
+ boolean isNeedInferType) {
+ super(id, devicePath, isAligned, measurements, dataTypes);
+ this.time = time;
+ this.values = values;
+ this.isNeedInferType = isNeedInferType;
+ }
+
public InsertRowNode(
PlanNodeId id,
PartialPath devicePath,
boolean isAligned,
String[] measurements,
TSDataType[] dataTypes,
+ MeasurementSchema[] measurementSchemas,
long time,
Object[] values,
boolean isNeedInferType) {
super(id, devicePath, isAligned, measurements, dataTypes);
+ this.measurementSchemas = measurementSchemas;
this.time = time;
this.values = values;
this.isNeedInferType = isNeedInferType;
@@ -157,16 +157,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
}
}
- @Override
- public TSEncoding getEncoding(int index) {
- return null;
- }
-
- @Override
- public CompressionType getCompressionType(int index) {
- return null;
- }
-
public Object[] getValues() {
return values;
}
@@ -197,81 +187,10 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
}
@Override
- protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
- if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
- logger.warn(
- "Inserting to {}.{} : Cast from {} to {}",
- devicePath,
- measurements[columnIndex],
- dataTypes[columnIndex],
- dataType);
- values[columnIndex] =
- CommonUtils.castValue(dataTypes[columnIndex], dataType, values[columnIndex]);
- dataTypes[columnIndex] = dataType;
- return true;
- }
- return false;
- }
-
- /**
- * transfer String[] values to specific data types when isNeedInferType is true. <br>
- * Notice: measurementSchemas must be initialized before calling this method
- */
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- public void transferType() throws QueryProcessException {
-
- for (int i = 0; i < measurementSchemas.length; i++) {
- // null when time series doesn't exist
- if (measurementSchemas[i] == null) {
- if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- throw new QueryProcessException(
- new PathNotExistException(
- devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
- } else {
- markFailedMeasurement(
- i,
- new QueryProcessException(
- new PathNotExistException(
- devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
- }
- continue;
- }
- // parse string value to specific type
- dataTypes[i] = measurementSchemas[i].getType();
- try {
- values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
- } catch (Exception e) {
- logger.warn(
- "data type of {}.{} is not consistent, registered type {}, inserting timestamp {}, value {}",
- devicePath,
- measurements[i],
- dataTypes[i],
- time,
- values[i]);
- if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- throw e;
- } else {
- markFailedMeasurement(i, e);
- }
- }
- }
- isNeedInferType = false;
- }
-
- @Override
- public void markFailedMeasurement(int index, Exception cause) {
+ public void markFailedMeasurement(int index) {
if (measurements[index] == null) {
return;
}
-
- if (failedMeasurementIndex2Info == null) {
- failedMeasurementIndex2Info = new HashMap<>();
- }
-
- FailedMeasurementInfo failedMeasurementInfo =
- new FailedMeasurementInfo(measurements[index], dataTypes[index], values[index], cause);
- failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
-
measurements[index] = null;
dataTypes[index] = null;
values[index] = null;
@@ -527,11 +446,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
return getTime();
}
- @Override
- public Object getFirstValueOfIndex(int index) {
- return values[index];
- }
-
// region serialize & deserialize methods for WAL
/** Serialized size for wal */
@Override
@@ -810,49 +724,4 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
Object value = values[columnIndex];
return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value));
}
-
- @Override
- public void validateDeviceSchema(boolean isAligned) {
- if (this.isAligned != isAligned) {
- throw new SemanticException(
- new AlignedTimeseriesException(
- String.format(
- "timeseries under this device are%s aligned, " + "please use %s interface",
- isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
- devicePath.getFullPath()));
- }
- }
-
- @Override
- public ISchemaValidation getSchemaValidation() {
- return this;
- }
-
- @Override
- public void updateAfterSchemaValidation() throws QueryProcessException {
- if (isNeedInferType) {
- transferType();
- }
- }
-
- @Override
- public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
- if (measurementSchemas == null) {
- measurementSchemas = new MeasurementSchema[measurements.length];
- }
- if (measurementSchemaInfo == null) {
- measurementSchemas[index] = null;
- } else {
- measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
- }
- if (isNeedInferType) {
- return;
- }
-
- try {
- selfCheckDataTypes(index);
- } catch (DataTypeMismatchException | PathNotExistException e) {
- throw new SemanticException(e);
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 5921c033f6..050839f642 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -22,9 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -32,7 +30,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -43,9 +40,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.stream.Collectors;
-public class InsertRowsNode extends InsertNode implements BatchInsertNode {
+public class InsertRowsNode extends InsertNode {
/**
* Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
@@ -121,11 +117,6 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
@Override
public void addChild(PlanNode child) {}
- @Override
- protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
- return false;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -156,23 +147,6 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
return null;
}
- @Override
- public List<ISchemaValidation> getSchemaValidationList() {
- return insertRowNodeList.stream()
- .map(InsertRowNode::getSchemaValidation)
- .collect(Collectors.toList());
- }
-
- @Override
- public void updateAfterSchemaValidation() throws QueryProcessException {
- for (InsertRowNode insertRowNode : insertRowNodeList) {
- insertRowNode.updateAfterSchemaValidation();
- if (!this.hasFailedMeasurements() && insertRowNode.hasFailedMeasurements()) {
- this.failedMeasurementIndex2Info = insertRowNode.failedMeasurementIndex2Info;
- }
- }
- }
-
public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId;
List<InsertRowNode> insertRowNodeList = new ArrayList<>();
@@ -266,9 +240,4 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
public long getMinTime() {
throw new NotImplementedException();
}
-
- @Override
- public Object getFirstValueOfIndex(int index) {
- throw new NotImplementedException();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 32b3e1c08d..dd788ea4db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -47,9 +45,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
-public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsertNode {
+public class InsertRowsOfOneDeviceNode extends InsertNode {
/**
* Suppose there is an InsertRowsOfOneDeviceNode, which contains 5 InsertRowNodes,
@@ -144,11 +141,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
return null;
}
- @Override
- protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
- return false;
- }
-
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
List<WritePlanNode> result = new ArrayList<>();
@@ -291,23 +283,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
}
- @Override
- public List<ISchemaValidation> getSchemaValidationList() {
- return insertRowNodeList.stream()
- .map(InsertRowNode::getSchemaValidation)
- .collect(Collectors.toList());
- }
-
- @Override
- public void updateAfterSchemaValidation() throws QueryProcessException {
- for (InsertRowNode insertRowNode : insertRowNodeList) {
- insertRowNode.updateAfterSchemaValidation();
- if (!this.hasFailedMeasurements() && insertRowNode.hasFailedMeasurements()) {
- this.failedMeasurementIndex2Info = insertRowNode.failedMeasurementIndex2Info;
- }
- }
- }
-
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertRowsOfOneDevice(this, context);
@@ -317,9 +292,4 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
public long getMinTime() {
throw new NotImplementedException();
}
-
- @Override
- public Object getFirstValueOfIndex(int index) {
- throw new NotImplementedException();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 09fa7c92b7..2501c535a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -23,19 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -43,9 +36,7 @@ import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -54,9 +45,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -69,9 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class InsertTabletNode extends InsertNode implements WALEntryValue, ISchemaValidation {
-
- private static final Logger logger = LoggerFactory.getLogger(InsertTabletNode.class);
+public class InsertTabletNode extends InsertNode implements WALEntryValue {
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
@@ -97,17 +83,37 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
super(id);
}
+ @TestOnly
+ public InsertTabletNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ long[] times,
+ BitMap[] bitMaps,
+ Object[] columns,
+ int rowCount) {
+ super(id, devicePath, isAligned, measurements, dataTypes);
+ this.times = times;
+ this.bitMaps = bitMaps;
+ this.columns = columns;
+ this.rowCount = rowCount;
+ }
+
public InsertTabletNode(
PlanNodeId id,
PartialPath devicePath,
boolean isAligned,
String[] measurements,
TSDataType[] dataTypes,
+ MeasurementSchema[] measurementSchemas,
long[] times,
BitMap[] bitMaps,
Object[] columns,
int rowCount) {
super(id, devicePath, isAligned, measurements, dataTypes);
+ this.measurementSchemas = measurementSchemas;
this.times = times;
this.bitMaps = bitMaps;
this.columns = columns;
@@ -177,23 +183,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
return null;
}
- @Override
- protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
- if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
- logger.warn(
- "Inserting to {}.{} : Cast from {} to {}",
- devicePath,
- measurements[columnIndex],
- dataTypes[columnIndex],
- dataType);
- columns[columnIndex] =
- CommonUtils.castArray(dataTypes[columnIndex], dataType, columns[columnIndex]);
- dataTypes[columnIndex] = dataType;
- return true;
- }
- return false;
- }
-
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
// only single device in single database
@@ -287,6 +276,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
isAligned,
measurements,
dataTypes,
+ measurementSchemas,
subTimes,
bitMaps,
values,
@@ -357,19 +347,10 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
@Override
- public void markFailedMeasurement(int index, Exception cause) {
+ public void markFailedMeasurement(int index) {
if (measurements[index] == null) {
return;
}
-
- if (failedMeasurementIndex2Info == null) {
- failedMeasurementIndex2Info = new HashMap<>();
- }
-
- FailedMeasurementInfo failedMeasurementInfo =
- new FailedMeasurementInfo(measurements[index], dataTypes[index], columns[index], cause);
- failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
-
measurements[index] = null;
dataTypes[index] = null;
columns[index] = null;
@@ -380,41 +361,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
return times[0];
}
- @Override
- public Object getFirstValueOfIndex(int index) {
- Object value;
- switch (dataTypes[index]) {
- case INT32:
- int[] intValues = (int[]) columns[index];
- value = intValues[0];
- break;
- case INT64:
- long[] longValues = (long[]) columns[index];
- value = longValues[0];
- break;
- case FLOAT:
- float[] floatValues = (float[]) columns[index];
- value = floatValues[0];
- break;
- case DOUBLE:
- double[] doubleValues = (double[]) columns[index];
- value = doubleValues[0];
- break;
- case BOOLEAN:
- boolean[] boolValues = (boolean[]) columns[index];
- value = boolValues[0];
- break;
- case TEXT:
- Binary[] binaryValues = (Binary[]) columns[index];
- value = binaryValues[0];
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[index]));
- }
- return value;
- }
-
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.INSERT_TABLET.serialize(byteBuffer);
@@ -1122,49 +1068,4 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
return new TimeValuePair(times[lastIdx], value);
}
-
- @Override
- public TSEncoding getEncoding(int index) {
- return null;
- }
-
- @Override
- public CompressionType getCompressionType(int index) {
- return null;
- }
-
- @Override
- public void validateDeviceSchema(boolean isAligned) {
- if (this.isAligned != isAligned) {
- throw new SemanticException(
- new AlignedTimeseriesException(
- String.format(
- "timeseries under this device are%s aligned, " + "please use %s interface",
- isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
- devicePath.getFullPath()));
- }
- }
-
- @Override
- public ISchemaValidation getSchemaValidation() {
- return this;
- }
-
- @Override
- public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
- if (measurementSchemas == null) {
- measurementSchemas = new MeasurementSchema[measurements.length];
- }
- if (measurementSchemaInfo == null) {
- measurementSchemas[index] = null;
- } else {
- measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
- }
-
- try {
- selfCheckDataTypes(index);
- } catch (DataTypeMismatchException | PathNotExistException e) {
- throw new SemanticException(e);
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cb64acaf6b..8d91843fef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -166,6 +166,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
}
private Future<FragInstanceDispatchResult> dispatchWriteAsync(List<FragmentInstance> instances) {
+ List<TSStatus> dataNodeFailureList = new ArrayList<>();
// split local and remote instances
List<FragmentInstance> localInstances = new ArrayList<>();
List<FragmentInstance> remoteInstances = new ArrayList<>();
@@ -182,14 +183,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances);
asyncPlanNodeSender.sendAll();
- List<TSStatus> dataNodeFailureList = new ArrayList<>();
-
if (!localInstances.isEmpty()) {
// sync dispatch to local
long localScheduleStartTime = System.nanoTime();
for (FragmentInstance localInstance : localInstances) {
try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
- dispatchOneInstance(localInstance);
+ dispatchLocally(localInstance);
} catch (FragmentInstanceDispatchException e) {
dataNodeFailureList.add(e.getFailureStatus());
} catch (Throwable t) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index d2d6d55ea1..d6c9941c58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -19,11 +19,19 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
public abstract class InsertBaseStatement extends Statement {
@@ -35,10 +43,15 @@ public abstract class InsertBaseStatement extends Statement {
protected boolean isAligned;
+ protected MeasurementSchema[] measurementSchemas;
+
protected String[] measurements;
// get from client
protected TSDataType[] dataTypes;
+ /** index of failed measurements -> info including measurement, data type and value */
+ protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
+
public PartialPath getDevicePath() {
return devicePath;
}
@@ -55,12 +68,12 @@ public abstract class InsertBaseStatement extends Statement {
this.measurements = measurements;
}
- public TSDataType[] getDataTypes() {
- return dataTypes;
+ public MeasurementSchema[] getMeasurementSchemas() {
+ return measurementSchemas;
}
- public void setDataTypes(TSDataType[] dataTypes) {
- this.dataTypes = dataTypes;
+ public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+ this.measurementSchemas = measurementSchemas;
}
public boolean isAligned() {
@@ -71,6 +84,14 @@ public abstract class InsertBaseStatement extends Statement {
isAligned = aligned;
}
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(TSDataType[] dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
/** Returns true when this statement is empty and no need to write into the server */
public abstract boolean isEmpty();
@@ -78,4 +99,132 @@ public abstract class InsertBaseStatement extends Statement {
public List<PartialPath> getPaths() {
return Collections.emptyList();
}
+
+ public abstract ISchemaValidation getSchemaValidation();
+
+ public abstract List<ISchemaValidation> getSchemaValidationList();
+
+ public void updateAfterSchemaValidation() throws QueryProcessException {}
+
+ /** Check whether data types are matched with measurement schemas */
+ protected void selfCheckDataTypes(int index)
+ throws DataTypeMismatchException, PathNotExistException {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ // if enable partial insert, mark failed measurements with exception
+ if (measurementSchemas[index] == null) {
+ markFailedMeasurement(
+ index,
+ new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath()));
+ } else if ((dataTypes[index] != measurementSchemas[index].getType()
+ && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
+ markFailedMeasurement(
+ index,
+ new DataTypeMismatchException(
+ devicePath.getFullPath(),
+ measurements[index],
+ dataTypes[index],
+ measurementSchemas[index].getType(),
+ getMinTime(),
+ getFirstValueOfIndex(index)));
+ }
+ } else {
+ // if not enable partial insert, throw the exception directly
+ if (measurementSchemas[index] == null) {
+ throw new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath());
+ } else if ((dataTypes[index] != measurementSchemas[index].getType()
+ && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
+ throw new DataTypeMismatchException(
+ devicePath.getFullPath(),
+ measurements[index],
+ dataTypes[index],
+ measurementSchemas[index].getType(),
+ getMinTime(),
+ getFirstValueOfIndex(index));
+ }
+ }
+ }
+
+ protected abstract boolean checkAndCastDataType(int columnIndex, TSDataType dataType);
+
+ public abstract long getMinTime();
+
+ public abstract Object getFirstValueOfIndex(int index);
+
+ // region partial insert
+ /**
+ * Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
+ * be null. We'd better use "measurements[index] == null" to determine if the measurement failed.
+ * <br>
+ * This method is not concurrency-safe.
+ *
+ * @param index failed measurement index
+ * @param cause cause Exception of failure
+ */
+ public void markFailedMeasurement(int index, Exception cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean hasValidMeasurements() {
+ for (Object o : measurements) {
+ if (o != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasFailedMeasurements() {
+ return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty();
+ }
+
+ public int getFailedMeasurementNumber() {
+ return failedMeasurementIndex2Info == null ? 0 : failedMeasurementIndex2Info.size();
+ }
+
+ public List<String> getFailedMeasurements() {
+ return failedMeasurementIndex2Info == null
+ ? Collections.emptyList()
+ : failedMeasurementIndex2Info.values().stream()
+ .map(info -> info.measurement)
+ .collect(Collectors.toList());
+ }
+
+ public List<Exception> getFailedExceptions() {
+ return failedMeasurementIndex2Info == null
+ ? Collections.emptyList()
+ : failedMeasurementIndex2Info.values().stream()
+ .map(info -> info.cause)
+ .collect(Collectors.toList());
+ }
+
+ public List<String> getFailedMessages() {
+ return failedMeasurementIndex2Info == null
+ ? Collections.emptyList()
+ : failedMeasurementIndex2Info.values().stream()
+ .map(
+ info -> {
+ Throwable cause = info.cause;
+ while (cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ return cause.getMessage();
+ })
+ .collect(Collectors.toList());
+ }
+
+ protected static class FailedMeasurementInfo {
+ protected String measurement;
+ protected TSDataType dataType;
+ protected Object value;
+ protected Exception cause;
+
+ public FailedMeasurementInfo(
+ String measurement, TSDataType dataType, Object value, Exception cause) {
+ this.measurement = measurement;
+ this.dataType = dataType;
+ this.value = value;
+ this.cause = cause;
+ }
+ }
+ // endregion
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
index 77090231e6..2ee89dc69c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class InsertMultiTabletsStatement extends InsertBaseStatement {
@@ -95,4 +98,31 @@ public class InsertMultiTabletsStatement extends InsertBaseStatement {
}
return result;
}
+
+ @Override
+ public ISchemaValidation getSchemaValidation() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<ISchemaValidation> getSchemaValidationList() {
+ return insertTabletStatementList.stream()
+ .map(InsertTabletStatement::getSchemaValidation)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+ return false;
+ }
+
+ @Override
+ public long getMinTime() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object getFirstValueOfIndex(int index) {
+ throw new NotImplementedException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index 7fc5659d90..094cfe0739 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -20,19 +20,38 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-public class InsertRowStatement extends InsertBaseStatement {
+public class InsertRowStatement extends InsertBaseStatement implements ISchemaValidation {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InsertRowStatement.class);
private static final byte TYPE_RAW_STRING = -1;
private static final byte TYPE_NULL = -2;
@@ -130,4 +149,166 @@ public class InsertRowStatement extends InsertBaseStatement {
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertRow(this, context);
}
+
+ @Override
+ public long getMinTime() {
+ return getTime();
+ }
+
+ @Override
+ public Object getFirstValueOfIndex(int index) {
+ return values[index];
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+ if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
+ LOGGER.warn(
+ "Inserting to {}.{} : Cast from {} to {}",
+ devicePath,
+ measurements[columnIndex],
+ dataTypes[columnIndex],
+ dataType);
+ values[columnIndex] =
+ CommonUtils.castValue(dataTypes[columnIndex], dataType, values[columnIndex]);
+ dataTypes[columnIndex] = dataType;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * transfer String[] values to specific data types when isNeedInferType is true. <br>
+ * Notice: measurementSchemas must be initialized before calling this method
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ public void transferType() throws QueryProcessException {
+
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ // null when time series doesn't exist
+ if (measurementSchemas[i] == null) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ throw new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+ } else {
+ markFailedMeasurement(
+ i,
+ new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
+ }
+ continue;
+ }
+ // parse string value to specific type
+ dataTypes[i] = measurementSchemas[i].getType();
+ try {
+ values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+ } catch (Exception e) {
+ LOGGER.warn(
+ "data type of {}.{} is not consistent, "
+ + "registered type {}, inserting timestamp {}, value {}",
+ devicePath,
+ measurements[i],
+ dataTypes[i],
+ time,
+ values[i]);
+ if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ throw e;
+ } else {
+ markFailedMeasurement(i, e);
+ }
+ }
+ }
+ isNeedInferType = false;
+ }
+
+ @Override
+ public void markFailedMeasurement(int index, Exception cause) {
+ if (measurements[index] == null) {
+ return;
+ }
+
+ if (failedMeasurementIndex2Info == null) {
+ failedMeasurementIndex2Info = new HashMap<>();
+ }
+
+ InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo =
+ new InsertBaseStatement.FailedMeasurementInfo(
+ measurements[index], dataTypes[index], values[index], cause);
+ failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
+
+ measurements[index] = null;
+ dataTypes[index] = null;
+ values[index] = null;
+ }
+
+ @Override
+ public ISchemaValidation getSchemaValidation() {
+ return this;
+ }
+
+ @Override
+ public List<ISchemaValidation> getSchemaValidationList() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void updateAfterSchemaValidation() throws QueryProcessException {
+ if (isNeedInferType) {
+ transferType();
+ }
+ }
+
+ @Override
+ public TSDataType getDataType(int index) {
+ if (isNeedInferType) {
+ return TypeInferenceUtils.getPredictedDataType(values[index], true);
+ } else {
+ return dataTypes[index];
+ }
+ }
+
+ @Override
+ public TSEncoding getEncoding(int index) {
+ return null;
+ }
+
+ @Override
+ public CompressionType getCompressionType(int index) {
+ return null;
+ }
+
+ @Override
+ public void validateDeviceSchema(boolean isAligned) {
+ if (this.isAligned != isAligned) {
+ throw new SemanticException(
+ new AlignedTimeseriesException(
+ String.format(
+ "timeseries under this device are%s aligned, " + "please use %s interface",
+ isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
+ devicePath.getFullPath()));
+ }
+ }
+
+ @Override
+ public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
+ if (measurementSchemas == null) {
+ measurementSchemas = new MeasurementSchema[measurements.length];
+ }
+ if (measurementSchemaInfo == null) {
+ measurementSchemas[index] = null;
+ } else {
+ measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
+ }
+ if (isNeedInferType) {
+ return;
+ }
+
+ try {
+ selfCheckDataTypes(index);
+ } catch (DataTypeMismatchException | PathNotExistException e) {
+ throw new SemanticException(e);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index 1507dd112b..65c1f106a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -21,14 +21,19 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
@@ -94,4 +99,41 @@ public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
}
return ret;
}
+
+ @Override
+ public ISchemaValidation getSchemaValidation() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<ISchemaValidation> getSchemaValidationList() {
+ return insertRowStatementList.stream()
+ .map(InsertRowStatement::getSchemaValidation)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void updateAfterSchemaValidation() throws QueryProcessException {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ insertRowStatement.updateAfterSchemaValidation();
+ if (!this.hasFailedMeasurements() && insertRowStatement.hasFailedMeasurements()) {
+ this.failedMeasurementIndex2Info = insertRowStatement.failedMeasurementIndex2Info;
+ }
+ }
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+ return false;
+ }
+
+ @Override
+ public long getMinTime() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object getFirstValueOfIndex(int index) {
+ throw new NotImplementedException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
index c2b24e6926..0f0f973b94 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
@@ -20,12 +20,16 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class InsertRowsStatement extends InsertBaseStatement {
@@ -95,4 +99,41 @@ public class InsertRowsStatement extends InsertBaseStatement {
}
return result;
}
+
+ @Override
+ public ISchemaValidation getSchemaValidation() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<ISchemaValidation> getSchemaValidationList() {
+ return insertRowStatementList.stream()
+ .map(InsertRowStatement::getSchemaValidation)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void updateAfterSchemaValidation() throws QueryProcessException {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ insertRowStatement.updateAfterSchemaValidation();
+ if (!this.hasFailedMeasurements() && insertRowStatement.hasFailedMeasurements()) {
+ this.failedMeasurementIndex2Info = insertRowStatement.failedMeasurementIndex2Info;
+ }
+ }
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+ return false;
+ }
+
+ @Override
+ public long getMinTime() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object getFirstValueOfIndex(int index) {
+ throw new NotImplementedException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
index 2da234041a..cfa864fbbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
@@ -20,15 +20,36 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-public class InsertTabletStatement extends InsertBaseStatement {
+public class InsertTabletStatement extends InsertBaseStatement implements ISchemaValidation {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InsertTabletStatement.class);
+
+ private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
private long[] times; // times should be sorted. It is done in the session API.
private BitMap[] bitMaps;
@@ -117,4 +138,136 @@ public class InsertTabletStatement extends InsertBaseStatement {
}
return ret;
}
+
+ @Override
+ public ISchemaValidation getSchemaValidation() {
+ return this;
+ }
+
+ @Override
+ public List<ISchemaValidation> getSchemaValidationList() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+ if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
+ LOGGER.warn(
+ "Inserting to {}.{} : Cast from {} to {}",
+ devicePath,
+ measurements[columnIndex],
+ dataTypes[columnIndex],
+ dataType);
+ columns[columnIndex] =
+ CommonUtils.castArray(dataTypes[columnIndex], dataType, columns[columnIndex]);
+ dataTypes[columnIndex] = dataType;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void markFailedMeasurement(int index, Exception cause) {
+ if (measurements[index] == null) {
+ return;
+ }
+
+ if (failedMeasurementIndex2Info == null) {
+ failedMeasurementIndex2Info = new HashMap<>();
+ }
+
+ InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo =
+ new InsertBaseStatement.FailedMeasurementInfo(
+ measurements[index], dataTypes[index], columns[index], cause);
+ failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
+
+ measurements[index] = null;
+ dataTypes[index] = null;
+ columns[index] = null;
+ }
+
+ @Override
+ public long getMinTime() {
+ return times[0];
+ }
+
+ @Override
+ public Object getFirstValueOfIndex(int index) {
+ Object value;
+ switch (dataTypes[index]) {
+ case INT32:
+ int[] intValues = (int[]) columns[index];
+ value = intValues[0];
+ break;
+ case INT64:
+ long[] longValues = (long[]) columns[index];
+ value = longValues[0];
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) columns[index];
+ value = floatValues[0];
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) columns[index];
+ value = doubleValues[0];
+ break;
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) columns[index];
+ value = boolValues[0];
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) columns[index];
+ value = binaryValues[0];
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[index]));
+ }
+ return value;
+ }
+
+ @Override
+ public TSDataType getDataType(int index) {
+ return dataTypes[index];
+ }
+
+ @Override
+ public TSEncoding getEncoding(int index) {
+ return null;
+ }
+
+ @Override
+ public CompressionType getCompressionType(int index) {
+ return null;
+ }
+
+ @Override
+ public void validateDeviceSchema(boolean isAligned) {
+ if (this.isAligned != isAligned) {
+ throw new SemanticException(
+ new AlignedTimeseriesException(
+ String.format(
+ "timeseries under this device are%s aligned, " + "please use %s interface",
+ isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
+ devicePath.getFullPath()));
+ }
+ }
+
+ @Override
+ public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
+ if (measurementSchemas == null) {
+ measurementSchemas = new MeasurementSchema[measurements.length];
+ }
+ if (measurementSchemaInfo == null) {
+ measurementSchemas[index] = null;
+ } else {
+ measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
+ }
+
+ try {
+ selfCheckDataTypes(index);
+ } catch (DataTypeMismatchException | PathNotExistException e) {
+ throw new SemanticException(e);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index 89430bc91f..093aa30e2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -109,9 +108,6 @@ public class TsFilePlanRedoer {
// TODO get device id by idTable
// idTable.getSeriesSchemas(node);
} else {
- if (!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
- SchemaValidator.validate(node);
- }
node.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(node.getDevicePath()));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index 34b2655e6c..89406f8f58 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -282,11 +282,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -304,11 +304,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -445,11 +445,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -466,11 +466,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -533,11 +533,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -554,11 +554,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -621,11 +621,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode1.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -642,11 +642,11 @@ public class DataRegionTest {
false,
measurements,
dataTypes,
+ measurementSchemas,
times,
null,
columns,
times.length);
- insertTabletNode2.setMeasurementSchemas(measurementSchemas);
dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 8cdfeeba81..7c03f926a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -64,7 +64,12 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
@Override
public void fetchAndComputeSchemaWithAutoCreate(
- List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {}
+ List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {
+ for (ISchemaComputationWithAutoCreation computation : schemaComputationWithAutoCreationList) {
+ computation.computeMeasurement(
+ 0, new SchemaMeasurementNode("s", new MeasurementSchema("s", TSDataType.INT32)));
+ }
+ }
/**
* Generate the following tree: root.sg.d1.s1, root.sg.d1.s2(status) root.sg.d2.s1,
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
index bb696fdda8..1156021d7f 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
@@ -236,18 +236,6 @@ public class WALFileTest {
}
bitMaps[i].mark(i % times.length);
}
-
- InsertTabletNode insertTabletNode =
- new InsertTabletNode(
- new PlanNodeId(""),
- new PartialPath(devicePath),
- false,
- new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
- dataTypes,
- times,
- bitMaps,
- columns,
- times.length);
MeasurementSchema[] schemas =
new MeasurementSchema[] {
new MeasurementSchema("s1", dataTypes[0]),
@@ -257,9 +245,18 @@ public class WALFileTest {
new MeasurementSchema("s5", dataTypes[4]),
new MeasurementSchema("s6", dataTypes[5]),
};
- insertTabletNode.setMeasurementSchemas(schemas);
- return insertTabletNode;
+ return new InsertTabletNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ schemas,
+ times,
+ bitMaps,
+ columns,
+ times.length);
}
public static DeleteDataNode getDeleteDataNode(String devicePath) throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index 517b7aeaf1..508338a6e0 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -697,19 +697,12 @@ public class ConsensusReqReaderTest {
bitMaps[i].mark(i % times.length);
}
- InsertTabletNode insertTabletNode =
- new InsertTabletNode(
- new PlanNodeId(""),
- new PartialPath(devicePath),
- false,
- new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
- dataTypes,
- times,
- bitMaps,
- columns,
- times.length);
-
- insertTabletNode.setMeasurementSchemas(
+ return new InsertTabletNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.DOUBLE),
new MeasurementSchema("s2", TSDataType.FLOAT),
@@ -717,9 +710,11 @@ public class ConsensusReqReaderTest {
new MeasurementSchema("s4", TSDataType.INT32),
new MeasurementSchema("s5", TSDataType.BOOLEAN),
new MeasurementSchema("s6", TSDataType.TEXT)
- });
-
- return insertTabletNode;
+ },
+ times,
+ bitMaps,
+ columns,
+ times.length);
}
private DeleteDataNode getDeleteDataNode(String devicePath) throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 770a3c7fb2..d2820fbde6 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -195,24 +195,22 @@ public class WALNodeTest {
}
bitMaps[i].mark(i % times.length);
}
-
- InsertTabletNode insertTabletNode =
- new InsertTabletNode(
- new PlanNodeId(""),
- new PartialPath(devicePath),
- false,
- measurements,
- dataTypes,
- times,
- bitMaps,
- columns,
- times.length);
MeasurementSchema[] schemas = new MeasurementSchema[6];
for (int i = 0; i < 6; i++) {
schemas[i] = new MeasurementSchema(measurements[i], dataTypes[i], TSEncoding.PLAIN);
}
- insertTabletNode.setMeasurementSchemas(schemas);
- return insertTabletNode;
+
+ return new InsertTabletNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ measurements,
+ dataTypes,
+ schemas,
+ times,
+ bitMaps,
+ columns,
+ times.length);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
index 91843aecf6..dffbf32181 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
@@ -291,6 +291,7 @@ public class TsFilePlanRedoerTest {
false,
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
+ null,
times,
bitMaps,
columns,
@@ -392,6 +393,7 @@ public class TsFilePlanRedoerTest {
TSDataType.FLOAT,
TSDataType.TEXT
},
+ null,
times,
bitMaps,
columns,
@@ -473,6 +475,10 @@ public class TsFilePlanRedoerTest {
false,
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT64),
+ },
times,
null,
columns,
@@ -520,15 +526,14 @@ public class TsFilePlanRedoerTest {
false,
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT64),
+ },
times,
null,
columns,
times.length);
- insertTabletNode.setMeasurementSchemas(
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
- new MeasurementSchema("s2", TSDataType.INT64),
- });
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false, null);
@@ -634,6 +639,14 @@ public class TsFilePlanRedoerTest {
// mark value of time=9 as null
bitMaps[i].mark(3);
}
+ MeasurementSchema[] schemas =
+ new MeasurementSchema[] {
+ null,
+ new MeasurementSchema("s2", TSDataType.INT64),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN),
+ new MeasurementSchema("s4", TSDataType.FLOAT),
+ null
+ };
InsertTabletNode insertTabletNode =
new InsertTabletNode(
new PlanNodeId(""),
@@ -647,20 +660,13 @@ public class TsFilePlanRedoerTest {
TSDataType.FLOAT,
TSDataType.TEXT
},
+ schemas,
times,
bitMaps,
columns,
times.length);
// redo InsertTabletPlan, data region is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
- MeasurementSchema[] schemas =
- new MeasurementSchema[] {
- null,
- new MeasurementSchema("s2", TSDataType.INT64),
- new MeasurementSchema("s3", TSDataType.BOOLEAN),
- new MeasurementSchema("s4", TSDataType.FLOAT),
- null
- };
insertTabletNode.setMeasurementSchemas(schemas);
planRedoer.redoInsert(insertTabletNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
index d1fe54b54b..95875118cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
@@ -286,7 +286,7 @@ public class UnsealedTsFileRecoverPerformerTest {
time,
new Integer[] {1},
false);
- insertRowNode.markFailedMeasurement(0, new Exception());
+ insertRowNode.markFailedMeasurement(0);
// generate InsertTabletNode with null
time = 5;
@@ -297,11 +297,12 @@ public class UnsealedTsFileRecoverPerformerTest {
false,
new String[] {"s1"},
new TSDataType[] {TSDataType.INT64},
+ null,
new long[] {time},
null,
new Integer[] {1},
1);
- insertTabletNode.markFailedMeasurement(0, new Exception());
+ insertTabletNode.markFailedMeasurement(0);
int fakeMemTableId = 1;
WALEntry walEntry1 = new WALInfoEntry(fakeMemTableId++, insertRowNode);