You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/05/05 10:07:00 UTC

[iotdb] 03/03: fix ut

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch forward_schema_validate
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e4b428216df12d0f8966c5dc74cc0edd336daa97
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri May 5 18:06:36 2023 +0800

    fix ut
---
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 47 +++++++++++++++++++---
 .../mpp/plan/analyze/schema/SchemaValidator.java   | 11 +++--
 .../planner/plan/node/write/InsertTabletNode.java  | 17 ++++++++
 .../plan/statement/crud/InsertBaseStatement.java   | 36 -----------------
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |  7 +++-
 5 files changed, 70 insertions(+), 48 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index f30385724bf..644868c7475 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -31,6 +32,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
 import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeClientManager;
@@ -92,6 +94,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -206,6 +209,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
   private final IPartitionFetcher partitionFetcher;
   private final ISchemaFetcher schemaFetcher;
 
+  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
+      PerformanceOverviewMetrics.getInstance();
+
   public AnalyzeVisitor(IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
@@ -2028,7 +2034,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     Analysis analysis = new Analysis();
     analysis.setStatement(insertTabletStatement);
-    insertTabletStatement.validateSchema(analysis);
+    validateSchema(analysis, insertTabletStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
       return analysis;
     }
@@ -2045,7 +2051,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     Analysis analysis = new Analysis();
     analysis.setStatement(insertRowStatement);
-    insertRowStatement.validateSchema(analysis);
+    validateSchema(analysis, insertRowStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
       return analysis;
     }
@@ -2064,7 +2070,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     Analysis analysis = new Analysis();
     analysis.setStatement(insertRowsStatement);
-    insertRowsStatement.validateSchema(analysis);
+    validateSchema(analysis, insertRowsStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
       return analysis;
     }
@@ -2094,7 +2100,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     Analysis analysis = new Analysis();
     analysis.setStatement(insertMultiTabletsStatement);
-    insertMultiTabletsStatement.validateSchema(analysis);
+    validateSchema(analysis, insertMultiTabletsStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
       return analysis;
     }
@@ -2125,7 +2131,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     Analysis analysis = new Analysis();
     analysis.setStatement(insertRowsOfOneDeviceStatement);
-    insertRowsOfOneDeviceStatement.validateSchema(analysis);
+    validateSchema(analysis, insertRowsOfOneDeviceStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
       return analysis;
     }
@@ -2139,6 +2145,36 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
   }
 
+  private void validateSchema(Analysis analysis, InsertBaseStatement insertStatement) {
+    final long startTime = System.nanoTime();
+    try {
+      SchemaValidator.validate(schemaFetcher, insertStatement);
+    } catch (SemanticException e) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      if (e.getCause() instanceof IoTDBException) {
+        IoTDBException ioTDBException = (IoTDBException) e.getCause();
+        analysis.setFailStatus(
+            RpcUtils.getStatus(ioTDBException.getErrorCode(), ioTDBException.getMessage()));
+      } else {
+        analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
+      }
+      return;
+    } finally {
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime);
+    }
+    boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
+    String partialInsertMessage;
+    if (hasFailedMeasurement) {
+      partialInsertMessage =
+          String.format(
+              "Fail to insert measurements %s caused by %s",
+              insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages());
+      logger.warn(partialInsertMessage);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
+    }
+  }
+
   @Override
   public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
@@ -2381,6 +2417,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     }
 
     return SchemaValidator.validate(
+        schemaFetcher,
         deviceList,
         measurementList,
         dataTypeList,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 1c8655843c9..5005b9f7a78 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -35,17 +35,15 @@ import java.util.List;
 
 public class SchemaValidator {
 
-  private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
-
-  public static void validate(InsertBaseStatement insertStatement) {
+  public static void validate(ISchemaFetcher schemaFetcher, InsertBaseStatement insertStatement) {
     try {
       if (insertStatement instanceof InsertRowsStatement
           || insertStatement instanceof InsertMultiTabletsStatement
           || insertStatement instanceof InsertRowsOfOneDeviceStatement) {
-        SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
+        schemaFetcher.fetchAndComputeSchemaWithAutoCreate(
             insertStatement.getSchemaValidationList());
       } else {
-        SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidation());
+        schemaFetcher.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidation());
       }
       insertStatement.updateAfterSchemaValidation();
     } catch (QueryProcessException e) {
@@ -54,13 +52,14 @@ public class SchemaValidator {
   }
 
   public static ISchemaTree validate(
+      ISchemaFetcher schemaFetcher,
       List<PartialPath> devicePaths,
       List<String[]> measurements,
       List<TSDataType[]> dataTypes,
       List<TSEncoding[]> encodings,
       List<CompressionType[]> compressionTypes,
       List<Boolean> isAlignedList) {
-    return SCHEMA_FETCHER.fetchSchemaListWithAutoCreate(
+    return schemaFetcher.fetchSchemaListWithAutoCreate(
         devicePaths, measurements, dataTypes, encodings, compressionTypes, isAlignedList);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index bbfd5052edf..7fcdaa1e3b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -88,6 +88,23 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     super(id);
   }
 
+  public InsertTabletNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      long[] times,
+      BitMap[] bitMaps,
+      Object[] columns,
+      int rowCount) {
+    super(id, devicePath, isAligned, measurements, dataTypes);
+    this.times = times;
+    this.bitMaps = bitMaps;
+    this.columns = columns;
+    this.rowCount = rowCount;
+  }
+
   public InsertTabletNode(
       PlanNodeId id,
       PartialPath devicePath,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index 27e790ea74a..fa2cc513ce9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -18,20 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -123,36 +117,6 @@ public abstract class InsertBaseStatement extends Statement {
 
   public void updateAfterSchemaValidation() throws QueryProcessException {}
 
-  public void validateSchema(Analysis analysis) {
-    final long startTime = System.nanoTime();
-    try {
-      SchemaValidator.validate(this);
-    } catch (SemanticException e) {
-      analysis.setFinishQueryAfterAnalyze(true);
-      if (e.getCause() instanceof IoTDBException) {
-        IoTDBException ioTDBException = (IoTDBException) e.getCause();
-        analysis.setFailStatus(
-            RpcUtils.getStatus(ioTDBException.getErrorCode(), ioTDBException.getMessage()));
-      } else {
-        analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
-      }
-      return;
-    } finally {
-      PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime);
-    }
-    boolean hasFailedMeasurement = hasFailedMeasurements();
-    String partialInsertMessage;
-    if (hasFailedMeasurement) {
-      partialInsertMessage =
-          String.format(
-              "Fail to insert measurements %s caused by %s",
-              getFailedMeasurements(), getFailedMessages());
-      LOGGER.warn(partialInsertMessage);
-      analysis.setFailStatus(
-          RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
-    }
-  }
-
   /** Check whether data types are matched with measurement schemas */
   protected void selfCheckDataTypes(int index)
       throws DataTypeMismatchException, PathNotExistException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 8cdfeeba815..7c03f926a1a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -64,7 +64,12 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
 
   @Override
   public void fetchAndComputeSchemaWithAutoCreate(
-      List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {}
+      List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {
+    for (ISchemaComputationWithAutoCreation computation : schemaComputationWithAutoCreationList) {
+      computation.computeMeasurement(
+          0, new SchemaMeasurementNode("s", new MeasurementSchema("s", TSDataType.INT32)));
+    }
+  }
 
   /**
    * Generate the following tree: root.sg.d1.s1, root.sg.d1.s2(status) root.sg.d2.s1,