You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/06/05 17:12:25 UTC

[iotdb] branch vector_partial created (now 489c7de)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch vector_partial
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 489c7de  [IOTDB-1422] Support partial insert for new vector interfaces

This branch includes the following new commits:

     new 489c7de  [IOTDB-1422] Support partial insert for new vector interfaces

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: [IOTDB-1422] Support partial insert for new vector interfaces

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch vector_partial
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 489c7de6f6046fe8f00b3d4c222b6046e5279652
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jun 6 01:11:39 2021 +0800

    [IOTDB-1422] Support partial insert for new vector interfaces
---
 .../iotdb/AlignedTimeseriesSessionExample.java     | 53 +++++++++++-----------
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 34 +++++++-------
 .../org/apache/iotdb/db/metadata/MManager.java     | 47 ++++++++++---------
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 18 ++++++++
 .../iotdb/db/metadata/MManagerBasicTest.java       |  4 +-
 .../java/org/apache/iotdb/session/Session.java     |  3 +-
 6 files changed, 92 insertions(+), 67 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 37ea190..795ed18 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -39,7 +39,9 @@ import java.util.List;
 public class AlignedTimeseriesSessionExample {
 
   private static Session session;
-  private static final String ROOT_SG1_D1_VECTOR = "root.sg_1.d1.vector";
+  private static final String ROOT_SG1_D1 = "root.sg_1.d1";
+  private static final String ROOT_SG1_D1_VECTOR1 = "root.sg_1.d1.vector";
+  private static final String ROOT_SG1_D1_VECTOR2 = "root.sg_1.d1.vector2";
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException {
@@ -55,30 +57,29 @@ public class AlignedTimeseriesSessionExample {
 
     insertTabletWithAlignedTimeseriesMethod1();
     insertTabletWithAlignedTimeseriesMethod2();
-
     insertNullableTabletWithAlignedTimeseries();
+
     selectTest();
     selectWithValueFilterTest();
-
     selectWithGroupByTest();
     selectWithLastTest();
 
     selectWithAggregationTest();
-
-    selectWithAlignByDeviceTest();
+    // FIXME @Silver Narcissus
+    // selectWithAlignByDeviceTest();
 
     session.close();
   }
 
   private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1");
+    SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
 
     dataSet.closeOperationHandle();
-    dataSet = session.executeQueryStatement("select * from root.sg_1.d1");
+    dataSet = session.executeQueryStatement("select * from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -102,8 +103,7 @@ public class AlignedTimeseriesSessionExample {
   private static void selectWithValueFilterTest()
       throws StatementExecutionException, IoTDBConnectionException {
     SessionDataSet dataSet =
-        session.executeQueryStatement("select s1 from root.sg_1.d1 where s1 > 0");
-    System.out.println(dataSet.getColumnNames());
+        session.executeQueryStatement("select s1 from root.sg_1.d1.vector where s1 > 0");
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
@@ -111,7 +111,7 @@ public class AlignedTimeseriesSessionExample {
     dataSet.closeOperationHandle();
     dataSet =
         session.executeQueryStatement(
-            "select * from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000");
+            "select * from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -122,7 +122,8 @@ public class AlignedTimeseriesSessionExample {
 
   private static void selectWithAggregationTest()
       throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select count(s1) from root.sg_1.d1");
+    SessionDataSet dataSet =
+        session.executeQueryStatement("select count(s1) from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -131,7 +132,7 @@ public class AlignedTimeseriesSessionExample {
     dataSet.closeOperationHandle();
     dataSet =
         session.executeQueryStatement(
-            "select sum(*) from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000");
+            "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -144,7 +145,7 @@ public class AlignedTimeseriesSessionExample {
       throws StatementExecutionException, IoTDBConnectionException {
     SessionDataSet dataSet =
         session.executeQueryStatement(
-            "select count(s1) from root.sg_1.d1 GROUP BY ([1, 100), 20ms)");
+            "select count(s1) from root.sg_1.d1.vector GROUP BY ([1, 100), 20ms)");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -153,7 +154,7 @@ public class AlignedTimeseriesSessionExample {
     dataSet.closeOperationHandle();
     dataSet =
         session.executeQueryStatement(
-            "select count(*) from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000"
+            "select count(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000"
                 + " GROUP BY ([50, 100), 10ms)");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
@@ -165,14 +166,15 @@ public class AlignedTimeseriesSessionExample {
 
   private static void selectWithLastTest()
       throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet dataSet = session.executeQueryStatement("select last s1 from root.sg_1.d1");
+    SessionDataSet dataSet =
+        session.executeQueryStatement("select last s1 from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
 
     dataSet.closeOperationHandle();
-    dataSet = session.executeQueryStatement("select last * from root.sg_1.d1");
+    dataSet = session.executeQueryStatement("select last * from root.sg_1.d1.vector");
     System.out.println(dataSet.getColumnNames());
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
@@ -195,7 +197,7 @@ public class AlignedTimeseriesSessionExample {
       encodings.add(TSEncoding.RLE);
     }
     session.createAlignedTimeseries(
-        ROOT_SG1_D1_VECTOR, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+        ROOT_SG1_D1_VECTOR1, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
   }
 
   // be sure template is coordinate with tablet
@@ -244,7 +246,7 @@ public class AlignedTimeseriesSessionExample {
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
 
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
     tablet.setAligned(true);
     long timestamp = System.currentTimeMillis();
 
@@ -283,11 +285,11 @@ public class AlignedTimeseriesSessionExample {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(
         new VectorMeasurementSchema(
-            "vector",
+            "vector2",
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
 
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
     tablet.setAligned(true);
     long[] timestamps = tablet.timestamps;
     Object[] values = tablet.values;
@@ -323,11 +325,11 @@ public class AlignedTimeseriesSessionExample {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(
         new VectorMeasurementSchema(
-            "vector",
+            "vector3",
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
 
-    Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
     tablet.setAligned(true);
 
     long[] timestamps = tablet.timestamps;
@@ -373,17 +375,14 @@ public class AlignedTimeseriesSessionExample {
     List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
-    measurements.add("s3");
     types.add(TSDataType.INT64);
     types.add(TSDataType.INT32);
-    types.add(TSDataType.INT64);
 
-    for (long time = 0; time < 100; time++) {
+    for (long time = 0; time < 1; time++) {
       List<Object> values = new ArrayList<>();
       values.add(1L);
       values.add(2);
-      values.add(3L);
-      session.insertRecord(ROOT_SG1_D1_VECTOR, time, measurements, types, values, true);
+      session.insertRecord(ROOT_SG1_D1_VECTOR2, time, measurements, types, values, true);
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index d00c2e8..d62f8fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -119,23 +119,25 @@ public abstract class AbstractMemTable implements IMemTable {
     int columnIndex = 0;
     if (insertRowPlan.isAligned()) {
       MeasurementMNode measurementMNode = measurementMNodes[0];
-      // write vector
-      Object[] vectorValue =
-          new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
-      for (int j = 0; j < vectorValue.length; j++) {
-        vectorValue[j] = values[columnIndex];
-        columnIndex++;
+      if (measurementMNode != null) {
+        // write vector
+        Object[] vectorValue =
+            new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
+        for (int j = 0; j < vectorValue.length; j++) {
+          vectorValue[j] = values[columnIndex];
+          columnIndex++;
+        }
+        memSize +=
+            MemUtils.getVectorRecordSize(
+                measurementMNode.getSchema().getValueTSDataTypeList(),
+                vectorValue,
+                disableMemControl);
+        write(
+            insertRowPlan.getPrefixPath().getFullPath(),
+            measurementMNode.getSchema(),
+            insertRowPlan.getTime(),
+            vectorValue);
       }
-      memSize +=
-          MemUtils.getVectorRecordSize(
-              measurementMNode.getSchema().getValueTSDataTypeList(),
-              vectorValue,
-              disableMemControl);
-      write(
-          insertRowPlan.getPrefixPath().getFullPath(),
-          measurementMNode.getSchema(),
-          insertRowPlan.getTime(),
-          vectorValue);
     } else {
       for (MeasurementMNode measurementMNode : measurementMNodes) {
         if (values[columnIndex] == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 3e9af1b..3a94e9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -2201,9 +2201,7 @@ public class MManager {
         }
 
         // check type is match
-        boolean mismatch = false;
         TSDataType insertDataType;
-        DataTypeMismatchException mismatchException = null;
         if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
           if (plan.isAligned()) {
             TSDataType dataTypeInNode =
@@ -2213,16 +2211,27 @@ public class MManager {
               insertDataType = dataTypeInNode;
             }
             if (dataTypeInNode != insertDataType) {
-              mismatch = true;
               logger.warn(
                   "DataType mismatch, Insert measurement {} in {} type {}, metadata tree type {}",
                   measurementMNode.getSchema().getValueMeasurementIdList().get(i),
                   measurementList[i],
                   insertDataType,
                   dataTypeInNode);
-              mismatchException =
+              DataTypeMismatchException mismatchException =
                   new DataTypeMismatchException(measurementList[i], insertDataType, dataTypeInNode);
+              if (!config.isEnablePartialInsert()) {
+                throw mismatchException;
+              } else {
+                // mark failed measurement
+                plan.markFailedMeasurementAlignedInsertion(mismatchException);
+                for (int j = 0; j < i; j++) {
+                  // all the measurementMNodes should be null
+                  measurementMNodes[j] = null;
+                }
+                break;
+              }
             }
+            measurementMNodes[i] = measurementMNode;
           } else {
             if (plan instanceof InsertRowPlan) {
               if (!((InsertRowPlan) plan).isNeedInferType()) {
@@ -2234,34 +2243,28 @@ public class MManager {
             } else {
               insertDataType = getTypeInLoc(plan, i);
             }
-            mismatch = measurementMNode.getSchema().getType() != insertDataType;
-            if (mismatch) {
+            if (measurementMNode.getSchema().getType() != insertDataType) {
               logger.warn(
                   "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
                   measurementList[i],
                   insertDataType,
                   measurementMNode.getSchema().getType());
-              mismatchException =
+              DataTypeMismatchException mismatchException =
                   new DataTypeMismatchException(
                       measurementList[i], insertDataType, measurementMNode.getSchema().getType());
+              if (!config.isEnablePartialInsert()) {
+                throw mismatchException;
+              } else {
+                // mark failed measurement
+                plan.markFailedMeasurementInsertion(i, mismatchException);
+                continue;
+              }
             }
+            measurementMNodes[i] = measurementMNode;
+            // set measurementName instead of alias
+            measurementList[i] = measurementMNode.getName();
           }
         }
-
-        if (mismatch) {
-          if (!config.isEnablePartialInsert()) {
-            throw mismatchException;
-          } else {
-            // mark failed measurement
-            plan.markFailedMeasurementInsertion(i, mismatchException);
-            continue;
-          }
-        }
-
-        measurementMNodes[i] = measurementMNode;
-
-        // set measurementName instead of alias
-        measurementList[i] = measurementMNode.getName();
       } catch (MetadataException e) {
         logger.warn(
             "meet error when check {}.{}, message: {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index c05d19a..8d5ad71 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -120,6 +120,24 @@ public abstract class InsertPlan extends PhysicalPlan {
     measurements[index] = null;
   }
 
+  public void markFailedMeasurementAlignedInsertion(Exception e) {
+    if (failedMeasurements == null) {
+      failedMeasurements = new ArrayList<>();
+      failedExceptions = new ArrayList<>();
+      failedIndices = new ArrayList<>();
+    }
+
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] == null) {
+        continue;
+      }
+      failedMeasurements.add(measurements[i]);
+      failedExceptions.add(e);
+      failedIndices.add(i);
+      measurements[i] = null;
+    }
+  }
+
   /**
    * Reconstruct this plan with the failed measurements.
    *
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 85422f4..5580b99 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -1370,8 +1370,10 @@ public class MManagerBasicTest {
       // call getSeriesSchemasAndReadLockDevice
       MNode mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
       assertEquals(4, mNode.getMeasurementMNodeCount());
+      assertNull(insertRowPlan.getMeasurementMNodes()[0]);
       assertNull(insertRowPlan.getMeasurementMNodes()[1]);
-      assertEquals(1, insertRowPlan.getFailedMeasurementNumber());
+      assertNull(insertRowPlan.getMeasurementMNodes()[2]);
+      assertEquals(3, insertRowPlan.getFailedMeasurementNumber());
 
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index dbdabc3..179b32e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1192,10 +1192,11 @@ public class Session {
 
     TSInsertTabletReq request = new TSInsertTabletReq();
 
-    if (request.isAligned) {
+    if (tablet.isAligned()) {
       if (tablet.getSchemas().size() > 1) {
         throw new BatchExecutionException("One tablet should only contain one aligned timeseries!");
       }
+      request.setIsAligned(true);
       IMeasurementSchema measurementSchema = tablet.getSchemas().get(0);
       request.setPrefixPath(
           tablet.deviceId + TsFileConstant.PATH_SEPARATOR + measurementSchema.getMeasurementId());