You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/09/07 01:54:21 UTC

[iotdb] 01/01: Support insert vector by insertStrRecord api

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch insertVector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 33c07733bf59b6500def0fd3a6f63b475fc23138
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Sep 7 09:53:38 2021 +0800

    Support insert vector by insertStrRecord api
---
 .../iotdb/AlignedTimeseriesSessionExample.java     | 49 +++++++++++------
 .../db/engine/storagegroup/TsFileProcessor.java    |  2 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   | 62 +++++++---------------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  1 +
 .../java/org/apache/iotdb/session/Session.java     | 17 +++++-
 thrift/src/main/thrift/rpc.thrift                  |  5 ++
 6 files changed, 72 insertions(+), 64 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 2bbde9a..13ca712 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -49,23 +49,24 @@ public class AlignedTimeseriesSessionExample {
     session = new Session("127.0.0.1", 6667, "root", "root");
     session.open(false);
 
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    createTemplate();
-    createAlignedTimeseries();
-    insertAlignedRecord();
-
-    insertTabletWithAlignedTimeseriesMethod1();
-    insertTabletWithAlignedTimeseriesMethod2();
-    insertNullableTabletWithAlignedTimeseries();
-
-    selectTest();
-    selectWithValueFilterTest();
-    selectWithGroupByTest();
-    selectWithLastTest();
-
-    selectWithAggregationTest();
+    //    // set session fetchSize
+    //    session.setFetchSize(10000);
+    //
+    //    createTemplate();
+    //    createAlignedTimeseries();
+    //    insertAlignedRecord();
+    insertAlignedStrRecord();
+    //
+    //    insertTabletWithAlignedTimeseriesMethod1();
+    //    insertTabletWithAlignedTimeseriesMethod2();
+    //    insertNullableTabletWithAlignedTimeseries();
+    //
+    //    selectTest();
+    //    selectWithValueFilterTest();
+    //    selectWithGroupByTest();
+    //    selectWithLastTest();
+    //
+    //    selectWithAggregationTest();
 
     // selectWithAlignByDeviceTest();
 
@@ -386,4 +387,18 @@ public class AlignedTimeseriesSessionExample {
       session.insertRecord(ROOT_SG1_D1_VECTOR4, time, measurements, types, values, true);
     }
   }
+
+  private static void insertAlignedStrRecord()
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+
+    for (long time = 0; time < 1; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("3");
+      values.add("4");
+      session.insertRecord(ROOT_SG1_D1_VECTOR4, time, measurements, values, true);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 7745138..b17332b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -339,7 +339,7 @@ public class TsFileProcessor {
       if (workMemTable.checkIfChunkDoesNotExist(deviceId, insertRowPlan.getMeasurements()[i])) {
         // ChunkMetadataIncrement
         IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
-        if (schema.getType() == TSDataType.VECTOR) {
+        if (insertRowPlan.isAligned()) {
           chunkMetadataIncrement +=
               schema.getValueTSDataTypeList().size()
                   * ChunkMetadata.calculateRamSize(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index d201ae7..3db1059 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -239,7 +239,6 @@ public class InsertRowPlan extends InsertPlan {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void transferType() throws QueryProcessException {
     if (isNeedInferType) {
-      int columnIndex = 0;
       for (int i = 0; i < measurementMNodes.length; i++) {
         if (measurementMNodes[i] == null) {
           if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
@@ -255,52 +254,27 @@ public class InsertRowPlan extends InsertPlan {
                 new PathNotExistException(
                     prefixPath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
           }
-          columnIndex++;
           continue;
         }
-        if (measurementMNodes[i].getSchema().getType() != TSDataType.VECTOR) {
-          dataTypes[columnIndex] = measurementMNodes[i].getSchema().getType();
-          try {
-            values[columnIndex] =
-                CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
-          } catch (Exception e) {
-            logger.warn(
-                "{}.{} data type is not consistent, input {}, registered {}",
-                prefixPath,
-                measurements[i],
-                values[i],
-                dataTypes[i]);
-            if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-              markFailedMeasurementInsertion(i, e);
-              measurementMNodes[i] = null;
-            } else {
-              throw e;
-            }
-          }
-          columnIndex++;
+        if (isAligned) {
+          dataTypes[i] = measurementMNodes[i].getSchema().getValueTSDataTypeList().get(i);
+        } else {
+          dataTypes[i] = measurementMNodes[i].getSchema().getType();
         }
-        // for aligned timeseries
-        else {
-          for (TSDataType dataType : measurementMNodes[i].getSchema().getValueTSDataTypeList()) {
-            dataTypes[columnIndex] = dataType;
-            try {
-              values[columnIndex] =
-                  CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
-            } catch (Exception e) {
-              logger.warn(
-                  "{}.{} data type is not consistent, input {}, registered {}",
-                  prefixPath,
-                  measurements[i],
-                  values[columnIndex],
-                  dataTypes[columnIndex]);
-              if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-                markFailedMeasurementInsertion(i, e);
-                measurementMNodes[i] = null;
-              } else {
-                throw e;
-              }
-            }
-            columnIndex++;
+        try {
+          values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+        } catch (Exception e) {
+          logger.warn(
+              "{}.{} data type is not consistent, input {}, registered {}",
+              prefixPath,
+              measurements[i],
+              values[i],
+              dataTypes[i]);
+          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+            markFailedMeasurementInsertion(i, e);
+            measurementMNodes[i] = null;
+          } else {
+            throw e;
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6f6e25c..7d1ff92 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1635,6 +1635,7 @@ public class TSServiceImpl implements TSIService.Iface {
       plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
       plan.setValues(req.getValues().toArray(new Object[0]));
       plan.setNeedInferType(true);
+      plan.setAligned(req.isAligned);
 
       TSStatus status = checkAuthority(plan, req.getSessionId());
       return status != null ? status : executeNonQueryPlan(plan);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 039c186..6ce0281 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -912,17 +912,30 @@ public class Session {
       String deviceId, long time, List<String> measurements, List<String> values)
       throws IoTDBConnectionException, StatementExecutionException {
     TSInsertStringRecordReq request =
-        genTSInsertStringRecordReq(deviceId, time, measurements, values);
+        genTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+    insertRecord(deviceId, request);
+  }
+
+  public void insertRecord(
+      String deviceId, long time, List<String> measurements, List<String> values, boolean isAligned)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertStringRecordReq request =
+        genTSInsertStringRecordReq(deviceId, time, measurements, values, isAligned);
     insertRecord(deviceId, request);
   }
 
   private TSInsertStringRecordReq genTSInsertStringRecordReq(
-      String deviceId, long time, List<String> measurements, List<String> values) {
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<String> values,
+      boolean isAligned) {
     TSInsertStringRecordReq request = new TSInsertStringRecordReq();
     request.setDeviceId(deviceId);
     request.setTimestamp(time);
     request.setMeasurements(measurements);
     request.setValues(values);
+    request.setIsAligned(isAligned);
     return request;
   }
 
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index f0079bb..7ef1668 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -211,6 +211,7 @@ struct TSInsertStringRecordReq {
   3: required list<string> measurements
   4: required list<string> values
   5: required i64 timestamp
+  6: optional bool isAligned
 }
 
 struct TSInsertTabletReq {
@@ -232,6 +233,7 @@ struct TSInsertTabletsReq {
   5: required list<binary> timestampsList
   6: required list<list<i32>> typesList
   7: required list<i32> sizeList
+  8: optional list<bool> isAlignedList
 }
 
 struct TSInsertRecordsReq {
@@ -240,6 +242,7 @@ struct TSInsertRecordsReq {
   3: required list<list<string>> measurementsList
   4: required list<binary> valuesList
   5: required list<i64> timestamps
+  6: optional list<bool> isAlignedList
 }
 
 struct TSInsertRecordsOfOneDeviceReq {
@@ -248,6 +251,7 @@ struct TSInsertRecordsOfOneDeviceReq {
     3: required list<list<string>> measurementsList
     4: required list<binary> valuesList
     5: required list<i64> timestamps
+    6: optional list<bool> isAlignedList
 }
 
 struct TSInsertStringRecordsReq {
@@ -256,6 +260,7 @@ struct TSInsertStringRecordsReq {
   3: required list<list<string>> measurementsList
   4: required list<list<string>> valuesList
   5: required list<i64> timestamps
+  6: optional list<bool> isAlignedList
 }
 
 struct TSDeleteDataReq {