You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/06/05 17:12:26 UTC

[iotdb] 01/01: [IOTDB-1422] Support partial insert for new vector interfaces

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch vector_partial
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 489c7de6f6046fe8f00b3d4c222b6046e5279652
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jun 6 01:11:39 2021 +0800

    [IOTDB-1422] Support partial insert for new vector interfaces
---
 .../iotdb/AlignedTimeseriesSessionExample.java     | 53 +++++++++++-----------
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 34 +++++++-------
 .../org/apache/iotdb/db/metadata/MManager.java     | 47 ++++++++++---------
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 18 ++++++++
 .../iotdb/db/metadata/MManagerBasicTest.java       |  4 +-
 .../java/org/apache/iotdb/session/Session.java     |  3 +-
 6 files changed, 92 insertions(+), 67 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 37ea190..795ed18 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -39,7 +39,9 @@ import java.util.List;
 public class AlignedTimeseriesSessionExample {
 
   private static Session session;
-  private static final String ROOT_SG1_D1_VECTOR = "root.sg_1.d1.vector";
+  private static final String ROOT_SG1_D1 = "root.sg_1.d1";
+  private static final String ROOT_SG1_D1_VECTOR1 = "root.sg_1.d1.vector";
+  private static final String ROOT_SG1_D1_VECTOR2 = "root.sg_1.d1.vector2";
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException {
@@ -55,30 +57,29 @@ public class AlignedTimeseriesSessionExample {
 
     insertTabletWithAlignedTimeseriesMethod1();
     insertTabletWithAlignedTimeseriesMethod2();
-
     insertNullableTabletWithAlignedTimeseries();
+
     selectTest();
     selectWithValueFilterTest();
-
     selectWithGroupByTest();
     selectWithLastTest();
 
     selectWithAggregationTest();
-
-    selectWithAlignByDeviceTest();
+    // FIXME @Silver Narcissus
+    // selectWithAlignByDeviceTest();
 
     session.close();
   }
 
   private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1");
+    SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
 
     dataSet.closeOperationHandle();
-    dataSet = session.executeQueryStatement("select * from root.sg_1.d1");
+    dataSet = session.executeQueryStatement("select * from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -102,8 +103,7 @@ public class AlignedTimeseriesSessionExample {
   private static void selectWithValueFilterTest()
       throws StatementExecutionException, IoTDBConnectionException {
     SessionDataSet dataSet =
-        session.executeQueryStatement("select s1 from root.sg_1.d1 where s1 > 0");
-    System.out.println(dataSet.getColumnNames());
+        session.executeQueryStatement("select s1 from root.sg_1.d1.vector where s1 > 0");
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
@@ -111,7 +111,7 @@ public class AlignedTimeseriesSessionExample {
     dataSet.closeOperationHandle();
     dataSet =
         session.executeQueryStatement(
-            "select * from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000");
+            "select * from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -122,7 +122,8 @@ public class AlignedTimeseriesSessionExample {
 
   private static void selectWithAggregationTest()
       throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select count(s1) from root.sg_1.d1");
+    SessionDataSet dataSet =
+        session.executeQueryStatement("select count(s1) from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -131,7 +132,7 @@ public class AlignedTimeseriesSessionExample {
     dataSet.closeOperationHandle();
     dataSet =
         session.executeQueryStatement(
-            "select sum(*) from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000");
+            "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -144,7 +145,7 @@ public class AlignedTimeseriesSessionExample {
       throws StatementExecutionException, IoTDBConnectionException {
     SessionDataSet dataSet =
         session.executeQueryStatement(
-            "select count(s1) from root.sg_1.d1 GROUP BY ([1, 100), 20ms)");
+            "select count(s1) from root.sg_1.d1.vector GROUP BY ([1, 100), 20ms)");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -153,7 +154,7 @@ public class AlignedTimeseriesSessionExample {
     dataSet.closeOperationHandle();
     dataSet =
         session.executeQueryStatement(
-            "select count(*) from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000"
+            "select count(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000"
                 + " GROUP BY ([50, 100), 10ms)");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
@@ -165,14 +166,15 @@ public class AlignedTimeseriesSessionExample {
 
   private static void selectWithLastTest()
       throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select last s1 from root.sg_1.d1");
+    SessionDataSet dataSet =
+        session.executeQueryStatement("select last s1 from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
 
     dataSet.closeOperationHandle();
-    dataSet = session.executeQueryStatement("select last * from root.sg_1.d1");
+    dataSet = session.executeQueryStatement("select last * from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -195,7 +197,7 @@ public class AlignedTimeseriesSessionExample {
       encodings.add(TSEncoding.RLE);
     }
     session.createAlignedTimeseries(
-        ROOT_SG1_D1_VECTOR, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+        ROOT_SG1_D1_VECTOR1, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
   }
 
   // be sure template is coordinate with tablet
@@ -244,7 +246,7 @@ public class AlignedTimeseriesSessionExample {
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
 
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
     tablet.setAligned(true);
     long timestamp = System.currentTimeMillis();
 
@@ -283,11 +285,11 @@ public class AlignedTimeseriesSessionExample {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(
         new VectorMeasurementSchema(
-            "vector",
+            "vector2",
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
 
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
     tablet.setAligned(true);
     long[] timestamps = tablet.timestamps;
     Object[] values = tablet.values;
@@ -323,11 +325,11 @@ public class AlignedTimeseriesSessionExample {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(
         new VectorMeasurementSchema(
-            "vector",
+            "vector3",
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
 
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
     tablet.setAligned(true);
 
     long[] timestamps = tablet.timestamps;
@@ -373,17 +375,14 @@ public class AlignedTimeseriesSessionExample {
     List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
-    measurements.add("s3");
     types.add(TSDataType.INT64);
     types.add(TSDataType.INT32);
-    types.add(TSDataType.INT64);
 
-    for (long time = 0; time < 100; time++) {
+    for (long time = 0; time < 1; time++) {
       List<Object> values = new ArrayList<>();
       values.add(1L);
       values.add(2);
-      values.add(3L);
-      session.insertRecord(ROOT_SG1_D1_VECTOR, time, measurements, types, values, true);
+      session.insertRecord(ROOT_SG1_D1_VECTOR2, time, measurements, types, values, true);
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index d00c2e8..d62f8fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -119,23 +119,25 @@ public abstract class AbstractMemTable implements IMemTable {
     int columnIndex = 0;
     if (insertRowPlan.isAligned()) {
       MeasurementMNode measurementMNode = measurementMNodes[0];
-      // write vector
-      Object[] vectorValue =
-          new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
-      for (int j = 0; j < vectorValue.length; j++) {
-        vectorValue[j] = values[columnIndex];
-        columnIndex++;
+      if (measurementMNode != null) {
+        // write vector
+        Object[] vectorValue =
+            new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
+        for (int j = 0; j < vectorValue.length; j++) {
+          vectorValue[j] = values[columnIndex];
+          columnIndex++;
+        }
+        memSize +=
+            MemUtils.getVectorRecordSize(
+                measurementMNode.getSchema().getValueTSDataTypeList(),
+                vectorValue,
+                disableMemControl);
+        write(
+            insertRowPlan.getPrefixPath().getFullPath(),
+            measurementMNode.getSchema(),
+            insertRowPlan.getTime(),
+            vectorValue);
       }
-      memSize +=
-          MemUtils.getVectorRecordSize(
-              measurementMNode.getSchema().getValueTSDataTypeList(),
-              vectorValue,
-              disableMemControl);
-      write(
-          insertRowPlan.getPrefixPath().getFullPath(),
-          measurementMNode.getSchema(),
-          insertRowPlan.getTime(),
-          vectorValue);
     } else {
       for (MeasurementMNode measurementMNode : measurementMNodes) {
         if (values[columnIndex] == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 3e9af1b..3a94e9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -2201,9 +2201,7 @@ public class MManager {
         }
 
         // check type is match
-        boolean mismatch = false;
         TSDataType insertDataType;
-        DataTypeMismatchException mismatchException = null;
         if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
           if (plan.isAligned()) {
             TSDataType dataTypeInNode =
@@ -2213,16 +2211,27 @@ public class MManager {
               insertDataType = dataTypeInNode;
             }
             if (dataTypeInNode != insertDataType) {
-              mismatch = true;
               logger.warn(
                   "DataType mismatch, Insert measurement {} in {} type {}, metadata tree type {}",
                   measurementMNode.getSchema().getValueMeasurementIdList().get(i),
                   measurementList[i],
                   insertDataType,
                   dataTypeInNode);
-              mismatchException =
+              DataTypeMismatchException mismatchException =
                   new DataTypeMismatchException(measurementList[i], insertDataType, dataTypeInNode);
+              if (!config.isEnablePartialInsert()) {
+                throw mismatchException;
+              } else {
+                // mark failed measurement
+                plan.markFailedMeasurementAlignedInsertion(mismatchException);
+                for (int j = 0; j < i; j++) {
+                  // all the measurementMNodes should be null
+                  measurementMNodes[j] = null;
+                }
+                break;
+              }
             }
+            measurementMNodes[i] = measurementMNode;
           } else {
             if (plan instanceof InsertRowPlan) {
               if (!((InsertRowPlan) plan).isNeedInferType()) {
@@ -2234,34 +2243,28 @@ public class MManager {
             } else {
               insertDataType = getTypeInLoc(plan, i);
             }
-            mismatch = measurementMNode.getSchema().getType() != insertDataType;
-            if (mismatch) {
+            if (measurementMNode.getSchema().getType() != insertDataType) {
               logger.warn(
                   "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
                   measurementList[i],
                   insertDataType,
                   measurementMNode.getSchema().getType());
-              mismatchException =
+              DataTypeMismatchException mismatchException =
                   new DataTypeMismatchException(
                       measurementList[i], insertDataType, measurementMNode.getSchema().getType());
+              if (!config.isEnablePartialInsert()) {
+                throw mismatchException;
+              } else {
+                // mark failed measurement
+                plan.markFailedMeasurementInsertion(i, mismatchException);
+                continue;
+              }
             }
+            measurementMNodes[i] = measurementMNode;
+            // set measurementName instead of alias
+            measurementList[i] = measurementMNode.getName();
           }
         }
-
-        if (mismatch) {
-          if (!config.isEnablePartialInsert()) {
-            throw mismatchException;
-          } else {
-            // mark failed measurement
-            plan.markFailedMeasurementInsertion(i, mismatchException);
-            continue;
-          }
-        }
-
-        measurementMNodes[i] = measurementMNode;
-
-        // set measurementName instead of alias
-        measurementList[i] = measurementMNode.getName();
       } catch (MetadataException e) {
         logger.warn(
             "meet error when check {}.{}, message: {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index c05d19a..8d5ad71 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -120,6 +120,24 @@ public abstract class InsertPlan extends PhysicalPlan {
     measurements[index] = null;
   }
 
+  public void markFailedMeasurementAlignedInsertion(Exception e) {
+    if (failedMeasurements == null) {
+      failedMeasurements = new ArrayList<>();
+      failedExceptions = new ArrayList<>();
+      failedIndices = new ArrayList<>();
+    }
+
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] == null) {
+        continue;
+      }
+      failedMeasurements.add(measurements[i]);
+      failedExceptions.add(e);
+      failedIndices.add(i);
+      measurements[i] = null;
+    }
+  }
+
   /**
    * Reconstruct this plan with the failed measurements.
    *
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 85422f4..5580b99 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -1370,8 +1370,10 @@ public class MManagerBasicTest {
       // call getSeriesSchemasAndReadLockDevice
       MNode mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
       assertEquals(4, mNode.getMeasurementMNodeCount());
+      assertNull(insertRowPlan.getMeasurementMNodes()[0]);
       assertNull(insertRowPlan.getMeasurementMNodes()[1]);
-      assertEquals(1, insertRowPlan.getFailedMeasurementNumber());
+      assertNull(insertRowPlan.getMeasurementMNodes()[2]);
+      assertEquals(3, insertRowPlan.getFailedMeasurementNumber());
 
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index dbdabc3..179b32e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1192,10 +1192,11 @@ public class Session {
 
     TSInsertTabletReq request = new TSInsertTabletReq();
 
-    if (request.isAligned) {
+    if (tablet.isAligned()) {
       if (tablet.getSchemas().size() > 1) {
         throw new BatchExecutionException("One tablet should only contain one aligned timeseries!");
       }
+      request.setIsAligned(true);
       IMeasurementSchema measurementSchema = tablet.getSchemas().get(0);
       request.setPrefixPath(
           tablet.deviceId + TsFileConstant.PATH_SEPARATOR + measurementSchema.getMeasurementId());