You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/05/05 10:07:00 UTC
[iotdb] 03/03: fix ut
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch forward_schema_validate
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e4b428216df12d0f8966c5dc74cc0edd336daa97
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri May 5 18:06:36 2023 +0800
fix ut
---
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 47 +++++++++++++++++++---
.../mpp/plan/analyze/schema/SchemaValidator.java | 11 +++--
.../planner/plan/node/write/InsertTabletNode.java | 17 ++++++++
.../plan/statement/crud/InsertBaseStatement.java | 36 -----------------
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 7 +++-
5 files changed, 70 insertions(+), 48 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index f30385724bf..644868c7475 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -31,6 +32,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeClientManager;
@@ -92,6 +94,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -206,6 +209,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
+ private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
+ PerformanceOverviewMetrics.getInstance();
+
public AnalyzeVisitor(IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
@@ -2028,7 +2034,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(insertTabletStatement);
- insertTabletStatement.validateSchema(analysis);
+ validateSchema(analysis, insertTabletStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
@@ -2045,7 +2051,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(insertRowStatement);
- insertRowStatement.validateSchema(analysis);
+ validateSchema(analysis, insertRowStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
@@ -2064,7 +2070,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(insertRowsStatement);
- insertRowsStatement.validateSchema(analysis);
+ validateSchema(analysis, insertRowsStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
@@ -2094,7 +2100,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(insertMultiTabletsStatement);
- insertMultiTabletsStatement.validateSchema(analysis);
+ validateSchema(analysis, insertMultiTabletsStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
@@ -2125,7 +2131,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(insertRowsOfOneDeviceStatement);
- insertRowsOfOneDeviceStatement.validateSchema(analysis);
+ validateSchema(analysis, insertRowsOfOneDeviceStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
@@ -2139,6 +2145,36 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
}
+ private void validateSchema(Analysis analysis, InsertBaseStatement insertStatement) {
+ final long startTime = System.nanoTime();
+ try {
+ SchemaValidator.validate(schemaFetcher, insertStatement);
+ } catch (SemanticException e) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ if (e.getCause() instanceof IoTDBException) {
+ IoTDBException ioTDBException = (IoTDBException) e.getCause();
+ analysis.setFailStatus(
+ RpcUtils.getStatus(ioTDBException.getErrorCode(), ioTDBException.getMessage()));
+ } else {
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
+ }
+ return;
+ } finally {
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime);
+ }
+ boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
+ String partialInsertMessage;
+ if (hasFailedMeasurement) {
+ partialInsertMessage =
+ String.format(
+ "Fail to insert measurements %s caused by %s",
+ insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages());
+ logger.warn(partialInsertMessage);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
+ }
+ }
+
@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
@@ -2381,6 +2417,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
return SchemaValidator.validate(
+ schemaFetcher,
deviceList,
measurementList,
dataTypeList,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 1c8655843c9..5005b9f7a78 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -35,17 +35,15 @@ import java.util.List;
public class SchemaValidator {
- private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
-
- public static void validate(InsertBaseStatement insertStatement) {
+ public static void validate(ISchemaFetcher schemaFetcher, InsertBaseStatement insertStatement) {
try {
if (insertStatement instanceof InsertRowsStatement
|| insertStatement instanceof InsertMultiTabletsStatement
|| insertStatement instanceof InsertRowsOfOneDeviceStatement) {
- SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
+ schemaFetcher.fetchAndComputeSchemaWithAutoCreate(
insertStatement.getSchemaValidationList());
} else {
- SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidation());
+ schemaFetcher.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidation());
}
insertStatement.updateAfterSchemaValidation();
} catch (QueryProcessException e) {
@@ -54,13 +52,14 @@ public class SchemaValidator {
}
public static ISchemaTree validate(
+ ISchemaFetcher schemaFetcher,
List<PartialPath> devicePaths,
List<String[]> measurements,
List<TSDataType[]> dataTypes,
List<TSEncoding[]> encodings,
List<CompressionType[]> compressionTypes,
List<Boolean> isAlignedList) {
- return SCHEMA_FETCHER.fetchSchemaListWithAutoCreate(
+ return schemaFetcher.fetchSchemaListWithAutoCreate(
devicePaths, measurements, dataTypes, encodings, compressionTypes, isAlignedList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index bbfd5052edf..7fcdaa1e3b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -88,6 +88,23 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
super(id);
}
+ public InsertTabletNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ long[] times,
+ BitMap[] bitMaps,
+ Object[] columns,
+ int rowCount) {
+ super(id, devicePath, isAligned, measurements, dataTypes);
+ this.times = times;
+ this.bitMaps = bitMaps;
+ this.columns = columns;
+ this.rowCount = rowCount;
+ }
+
public InsertTabletNode(
PlanNodeId id,
PartialPath devicePath,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index 27e790ea74a..fa2cc513ce9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -18,20 +18,14 @@
*/
package org.apache.iotdb.db.mpp.plan.statement.crud;
-import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -123,36 +117,6 @@ public abstract class InsertBaseStatement extends Statement {
public void updateAfterSchemaValidation() throws QueryProcessException {}
- public void validateSchema(Analysis analysis) {
- final long startTime = System.nanoTime();
- try {
- SchemaValidator.validate(this);
- } catch (SemanticException e) {
- analysis.setFinishQueryAfterAnalyze(true);
- if (e.getCause() instanceof IoTDBException) {
- IoTDBException ioTDBException = (IoTDBException) e.getCause();
- analysis.setFailStatus(
- RpcUtils.getStatus(ioTDBException.getErrorCode(), ioTDBException.getMessage()));
- } else {
- analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
- }
- return;
- } finally {
- PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime);
- }
- boolean hasFailedMeasurement = hasFailedMeasurements();
- String partialInsertMessage;
- if (hasFailedMeasurement) {
- partialInsertMessage =
- String.format(
- "Fail to insert measurements %s caused by %s",
- getFailedMeasurements(), getFailedMessages());
- LOGGER.warn(partialInsertMessage);
- analysis.setFailStatus(
- RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
- }
- }
-
/** Check whether data types are matched with measurement schemas */
protected void selfCheckDataTypes(int index)
throws DataTypeMismatchException, PathNotExistException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 8cdfeeba815..7c03f926a1a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -64,7 +64,12 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
@Override
public void fetchAndComputeSchemaWithAutoCreate(
- List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {}
+ List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {
+ for (ISchemaComputationWithAutoCreation computation : schemaComputationWithAutoCreationList) {
+ computation.computeMeasurement(
+ 0, new SchemaMeasurementNode("s", new MeasurementSchema("s", TSDataType.INT32)));
+ }
+ }
/**
* Generate the following tree: root.sg.d1.s1, root.sg.d1.s2(status) root.sg.d2.s1,