You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/09/07 01:54:21 UTC
[iotdb] 01/01: Support insert vector by insertStrRecord api
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch insertVector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 33c07733bf59b6500def0fd3a6f63b475fc23138
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Sep 7 09:53:38 2021 +0800
Support insert vector by insertStrRecord api
---
.../iotdb/AlignedTimeseriesSessionExample.java | 49 +++++++++++------
.../db/engine/storagegroup/TsFileProcessor.java | 2 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 62 +++++++---------------
.../org/apache/iotdb/db/service/TSServiceImpl.java | 1 +
.../java/org/apache/iotdb/session/Session.java | 17 +++++-
thrift/src/main/thrift/rpc.thrift | 5 ++
6 files changed, 72 insertions(+), 64 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 2bbde9a..13ca712 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -49,23 +49,24 @@ public class AlignedTimeseriesSessionExample {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open(false);
- // set session fetchSize
- session.setFetchSize(10000);
-
- createTemplate();
- createAlignedTimeseries();
- insertAlignedRecord();
-
- insertTabletWithAlignedTimeseriesMethod1();
- insertTabletWithAlignedTimeseriesMethod2();
- insertNullableTabletWithAlignedTimeseries();
-
- selectTest();
- selectWithValueFilterTest();
- selectWithGroupByTest();
- selectWithLastTest();
-
- selectWithAggregationTest();
+ // // set session fetchSize
+ // session.setFetchSize(10000);
+ //
+ // createTemplate();
+ // createAlignedTimeseries();
+ // insertAlignedRecord();
+ insertAlignedStrRecord();
+ //
+ // insertTabletWithAlignedTimeseriesMethod1();
+ // insertTabletWithAlignedTimeseriesMethod2();
+ // insertNullableTabletWithAlignedTimeseries();
+ //
+ // selectTest();
+ // selectWithValueFilterTest();
+ // selectWithGroupByTest();
+ // selectWithLastTest();
+ //
+ // selectWithAggregationTest();
// selectWithAlignByDeviceTest();
@@ -386,4 +387,18 @@ public class AlignedTimeseriesSessionExample {
session.insertRecord(ROOT_SG1_D1_VECTOR4, time, measurements, types, values, true);
}
}
+
+ private static void insertAlignedStrRecord()
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+
+ for (long time = 0; time < 1; time++) {
+ List<String> values = new ArrayList<>();
+ values.add("3");
+ values.add("4");
+ session.insertRecord(ROOT_SG1_D1_VECTOR4, time, measurements, values, true);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 7745138..b17332b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -339,7 +339,7 @@ public class TsFileProcessor {
if (workMemTable.checkIfChunkDoesNotExist(deviceId, insertRowPlan.getMeasurements()[i])) {
// ChunkMetadataIncrement
IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
- if (schema.getType() == TSDataType.VECTOR) {
+ if (insertRowPlan.isAligned()) {
chunkMetadataIncrement +=
schema.getValueTSDataTypeList().size()
* ChunkMetadata.calculateRamSize(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index d201ae7..3db1059 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -239,7 +239,6 @@ public class InsertRowPlan extends InsertPlan {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void transferType() throws QueryProcessException {
if (isNeedInferType) {
- int columnIndex = 0;
for (int i = 0; i < measurementMNodes.length; i++) {
if (measurementMNodes[i] == null) {
if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
@@ -255,52 +254,27 @@ public class InsertRowPlan extends InsertPlan {
new PathNotExistException(
prefixPath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
}
- columnIndex++;
continue;
}
- if (measurementMNodes[i].getSchema().getType() != TSDataType.VECTOR) {
- dataTypes[columnIndex] = measurementMNodes[i].getSchema().getType();
- try {
- values[columnIndex] =
- CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
- } catch (Exception e) {
- logger.warn(
- "{}.{} data type is not consistent, input {}, registered {}",
- prefixPath,
- measurements[i],
- values[i],
- dataTypes[i]);
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- markFailedMeasurementInsertion(i, e);
- measurementMNodes[i] = null;
- } else {
- throw e;
- }
- }
- columnIndex++;
+ if (isAligned) {
+ dataTypes[i] = measurementMNodes[i].getSchema().getValueTSDataTypeList().get(i);
+ } else {
+ dataTypes[i] = measurementMNodes[i].getSchema().getType();
}
- // for aligned timeseries
- else {
- for (TSDataType dataType : measurementMNodes[i].getSchema().getValueTSDataTypeList()) {
- dataTypes[columnIndex] = dataType;
- try {
- values[columnIndex] =
- CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
- } catch (Exception e) {
- logger.warn(
- "{}.{} data type is not consistent, input {}, registered {}",
- prefixPath,
- measurements[i],
- values[columnIndex],
- dataTypes[columnIndex]);
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- markFailedMeasurementInsertion(i, e);
- measurementMNodes[i] = null;
- } else {
- throw e;
- }
- }
- columnIndex++;
+ try {
+ values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+ } catch (Exception e) {
+ logger.warn(
+ "{}.{} data type is not consistent, input {}, registered {}",
+ prefixPath,
+ measurements[i],
+ values[i],
+ dataTypes[i]);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ markFailedMeasurementInsertion(i, e);
+ measurementMNodes[i] = null;
+ } else {
+ throw e;
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6f6e25c..7d1ff92 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1635,6 +1635,7 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setValues(req.getValues().toArray(new Object[0]));
plan.setNeedInferType(true);
+ plan.setAligned(req.isAligned);
TSStatus status = checkAuthority(plan, req.getSessionId());
return status != null ? status : executeNonQueryPlan(plan);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 039c186..6ce0281 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -912,17 +912,30 @@ public class Session {
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertStringRecordReq request =
- genTSInsertStringRecordReq(deviceId, time, measurements, values);
+ genTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+ insertRecord(deviceId, request);
+ }
+
+ public void insertRecord(
+ String deviceId, long time, List<String> measurements, List<String> values, boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ TSInsertStringRecordReq request =
+ genTSInsertStringRecordReq(deviceId, time, measurements, values, isAligned);
insertRecord(deviceId, request);
}
private TSInsertStringRecordReq genTSInsertStringRecordReq(
- String deviceId, long time, List<String> measurements, List<String> values) {
+ String deviceId,
+ long time,
+ List<String> measurements,
+ List<String> values,
+ boolean isAligned) {
TSInsertStringRecordReq request = new TSInsertStringRecordReq();
request.setDeviceId(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
request.setValues(values);
+ request.setIsAligned(isAligned);
return request;
}
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index f0079bb..7ef1668 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -211,6 +211,7 @@ struct TSInsertStringRecordReq {
3: required list<string> measurements
4: required list<string> values
5: required i64 timestamp
+ 6: optional bool isAligned
}
struct TSInsertTabletReq {
@@ -232,6 +233,7 @@ struct TSInsertTabletsReq {
5: required list<binary> timestampsList
6: required list<list<i32>> typesList
7: required list<i32> sizeList
+ 8: optional list<bool> isAlignedList
}
struct TSInsertRecordsReq {
@@ -240,6 +242,7 @@ struct TSInsertRecordsReq {
3: required list<list<string>> measurementsList
4: required list<binary> valuesList
5: required list<i64> timestamps
+ 6: optional list<bool> isAlignedList
}
struct TSInsertRecordsOfOneDeviceReq {
@@ -248,6 +251,7 @@ struct TSInsertRecordsOfOneDeviceReq {
3: required list<list<string>> measurementsList
4: required list<binary> valuesList
5: required list<i64> timestamps
+ 6: optional list<bool> isAlignedList
}
struct TSInsertStringRecordsReq {
@@ -256,6 +260,7 @@ struct TSInsertStringRecordsReq {
3: required list<list<string>> measurementsList
4: required list<list<string>> valuesList
5: required list<i64> timestamps
+ 6: optional list<bool> isAlignedList
}
struct TSDeleteDataReq {