You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/09/07 01:54:20 UTC

[iotdb] branch insertVector created (now 33c0773)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch insertVector
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 33c0773  Support insert vector by insertStrRecord api

This branch includes the following new commits:

     new 33c0773  Support insert vector by insertStrRecord api

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: Support insert vector by insertStrRecord api

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch insertVector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 33c07733bf59b6500def0fd3a6f63b475fc23138
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Sep 7 09:53:38 2021 +0800

    Support insert vector by insertStrRecord api
---
 .../iotdb/AlignedTimeseriesSessionExample.java     | 49 +++++++++++------
 .../db/engine/storagegroup/TsFileProcessor.java    |  2 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   | 62 +++++++---------------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  1 +
 .../java/org/apache/iotdb/session/Session.java     | 17 +++++-
 thrift/src/main/thrift/rpc.thrift                  |  5 ++
 6 files changed, 72 insertions(+), 64 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 2bbde9a..13ca712 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -49,23 +49,24 @@ public class AlignedTimeseriesSessionExample {
     session = new Session("127.0.0.1", 6667, "root", "root");
     session.open(false);
 
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    createTemplate();
-    createAlignedTimeseries();
-    insertAlignedRecord();
-
-    insertTabletWithAlignedTimeseriesMethod1();
-    insertTabletWithAlignedTimeseriesMethod2();
-    insertNullableTabletWithAlignedTimeseries();
-
-    selectTest();
-    selectWithValueFilterTest();
-    selectWithGroupByTest();
-    selectWithLastTest();
-
-    selectWithAggregationTest();
+    //    // set session fetchSize
+    //    session.setFetchSize(10000);
+    //
+    //    createTemplate();
+    //    createAlignedTimeseries();
+    //    insertAlignedRecord();
+    insertAlignedStrRecord();
+    //
+    //    insertTabletWithAlignedTimeseriesMethod1();
+    //    insertTabletWithAlignedTimeseriesMethod2();
+    //    insertNullableTabletWithAlignedTimeseries();
+    //
+    //    selectTest();
+    //    selectWithValueFilterTest();
+    //    selectWithGroupByTest();
+    //    selectWithLastTest();
+    //
+    //    selectWithAggregationTest();
 
     // selectWithAlignByDeviceTest();
 
@@ -386,4 +387,18 @@ public class AlignedTimeseriesSessionExample {
       session.insertRecord(ROOT_SG1_D1_VECTOR4, time, measurements, types, values, true);
     }
   }
+
+  private static void insertAlignedStrRecord()
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+
+    for (long time = 0; time < 1; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("3");
+      values.add("4");
+      session.insertRecord(ROOT_SG1_D1_VECTOR4, time, measurements, values, true);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 7745138..b17332b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -339,7 +339,7 @@ public class TsFileProcessor {
       if (workMemTable.checkIfChunkDoesNotExist(deviceId, insertRowPlan.getMeasurements()[i])) {
         // ChunkMetadataIncrement
         IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
-        if (schema.getType() == TSDataType.VECTOR) {
+        if (insertRowPlan.isAligned()) {
           chunkMetadataIncrement +=
               schema.getValueTSDataTypeList().size()
                   * ChunkMetadata.calculateRamSize(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index d201ae7..3db1059 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -239,7 +239,6 @@ public class InsertRowPlan extends InsertPlan {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void transferType() throws QueryProcessException {
     if (isNeedInferType) {
-      int columnIndex = 0;
       for (int i = 0; i < measurementMNodes.length; i++) {
         if (measurementMNodes[i] == null) {
           if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
@@ -255,52 +254,27 @@ public class InsertRowPlan extends InsertPlan {
                 new PathNotExistException(
                     prefixPath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
           }
-          columnIndex++;
           continue;
         }
-        if (measurementMNodes[i].getSchema().getType() != TSDataType.VECTOR) {
-          dataTypes[columnIndex] = measurementMNodes[i].getSchema().getType();
-          try {
-            values[columnIndex] =
-                CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
-          } catch (Exception e) {
-            logger.warn(
-                "{}.{} data type is not consistent, input {}, registered {}",
-                prefixPath,
-                measurements[i],
-                values[i],
-                dataTypes[i]);
-            if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-              markFailedMeasurementInsertion(i, e);
-              measurementMNodes[i] = null;
-            } else {
-              throw e;
-            }
-          }
-          columnIndex++;
+        if (isAligned) {
+          dataTypes[i] = measurementMNodes[i].getSchema().getValueTSDataTypeList().get(i);
+        } else {
+          dataTypes[i] = measurementMNodes[i].getSchema().getType();
         }
-        // for aligned timeseries
-        else {
-          for (TSDataType dataType : measurementMNodes[i].getSchema().getValueTSDataTypeList()) {
-            dataTypes[columnIndex] = dataType;
-            try {
-              values[columnIndex] =
-                  CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
-            } catch (Exception e) {
-              logger.warn(
-                  "{}.{} data type is not consistent, input {}, registered {}",
-                  prefixPath,
-                  measurements[i],
-                  values[columnIndex],
-                  dataTypes[columnIndex]);
-              if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-                markFailedMeasurementInsertion(i, e);
-                measurementMNodes[i] = null;
-              } else {
-                throw e;
-              }
-            }
-            columnIndex++;
+        try {
+          values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+        } catch (Exception e) {
+          logger.warn(
+              "{}.{} data type is not consistent, input {}, registered {}",
+              prefixPath,
+              measurements[i],
+              values[i],
+              dataTypes[i]);
+          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+            markFailedMeasurementInsertion(i, e);
+            measurementMNodes[i] = null;
+          } else {
+            throw e;
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6f6e25c..7d1ff92 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1635,6 +1635,7 @@ public class TSServiceImpl implements TSIService.Iface {
       plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
       plan.setValues(req.getValues().toArray(new Object[0]));
       plan.setNeedInferType(true);
+      plan.setAligned(req.isAligned);
 
       TSStatus status = checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 039c186..6ce0281 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -912,17 +912,30 @@ public class Session {
       String deviceId, long time, List<String> measurements, List<String> values)
       throws IoTDBConnectionException, StatementExecutionException {
     TSInsertStringRecordReq request =
-        genTSInsertStringRecordReq(deviceId, time, measurements, values);
+        genTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+    insertRecord(deviceId, request);
+  }
+
+  public void insertRecord(
+      String deviceId, long time, List<String> measurements, List<String> values, boolean isAligned)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertStringRecordReq request =
+        genTSInsertStringRecordReq(deviceId, time, measurements, values, isAligned);
     insertRecord(deviceId, request);
   }
 
   private TSInsertStringRecordReq genTSInsertStringRecordReq(
-      String deviceId, long time, List<String> measurements, List<String> values) {
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<String> values,
+      boolean isAligned) {
     TSInsertStringRecordReq request = new TSInsertStringRecordReq();
     request.setDeviceId(deviceId);
     request.setTimestamp(time);
     request.setMeasurements(measurements);
     request.setValues(values);
+    request.setIsAligned(isAligned);
     return request;
   }
 
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index f0079bb..7ef1668 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -211,6 +211,7 @@ struct TSInsertStringRecordReq {
   3: required list<string> measurements
   4: required list<string> values
   5: required i64 timestamp
+  6: optional bool isAligned
 }
 
 struct TSInsertTabletReq {
@@ -232,6 +233,7 @@ struct TSInsertTabletsReq {
   5: required list<binary> timestampsList
   6: required list<list<i32>> typesList
   7: required list<i32> sizeList
+  8: optional list<bool> isAlignedList
 }
 
 struct TSInsertRecordsReq {
@@ -240,6 +242,7 @@ struct TSInsertRecordsReq {
   3: required list<list<string>> measurementsList
   4: required list<binary> valuesList
   5: required list<i64> timestamps
+  6: optional list<bool> isAlignedList
 }
 
 struct TSInsertRecordsOfOneDeviceReq {
@@ -248,6 +251,7 @@ struct TSInsertRecordsOfOneDeviceReq {
     3: required list<list<string>> measurementsList
     4: required list<binary> valuesList
     5: required list<i64> timestamps
+    6: optional list<bool> isAlignedList
 }
 
 struct TSInsertStringRecordsReq {
@@ -256,6 +260,7 @@ struct TSInsertStringRecordsReq {
   3: required list<list<string>> measurementsList
   4: required list<list<string>> valuesList
   5: required list<i64> timestamps
+  6: optional list<bool> isAlignedList
 }
 
 struct TSDeleteDataReq {