You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/09 05:08:50 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3861]Adds null value handling logic to the Session API (#6857)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new af935518e9 [To rel/0.13][IOTDB-3861]Adds null value handling logic to the Session API (#6857)
af935518e9 is described below
commit af935518e9bff97a814236ffa9faf82f93c3d980
Author: suchenglong <40...@qq.com>
AuthorDate: Tue Aug 9 13:08:44 2022 +0800
[To rel/0.13][IOTDB-3861]Adds null value handling logic to the Session API (#6857)
---
.../iotdb/session/IoTDBSessionInsertNullT.java | 163 +++++
.../apache/iotdb/rpc/NoValidValueException.java | 29 +
.../java/org/apache/iotdb/session/Session.java | 687 +++++++++++++++++++--
3 files changed, 830 insertions(+), 49 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionInsertNullT.java b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionInsertNullT.java
new file mode 100644
index 0000000000..78c5fd1017
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionInsertNullT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBSessionInsertNullT {
+ private Session session;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+ EnvironmentUtils.envSetUp();
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ session.close();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testInsertRecordNull() throws StatementExecutionException, IoTDBConnectionException {
+ String deviceId = "root.sg1.clsu.d1";
+ session.insertRecord(deviceId, 100, Arrays.asList("s1"), Arrays.asList("true"));
+ List<String> t = new ArrayList<>();
+ t.add(null);
+ session.insertRecord(deviceId, 200, Arrays.asList("s1"), t);
+ session.insertRecord(
+ deviceId,
+ 300,
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+ Arrays.asList(true, 30));
+ session.insertRecord(
+ deviceId,
+ 400,
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+ Arrays.asList(true, null));
+ session.insertRecord(
+ deviceId,
+ 500,
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+ Arrays.asList(null, null));
+ long nums = queryCountRecords("select count(s1) from " + deviceId);
+ assertEquals(3, nums);
+ }
+
+ @Test
+ public void testInsertRecordsNull() throws StatementExecutionException, IoTDBConnectionException {
+ String deviceId1 = "root.sg1.clsu.d2";
+ String deviceId2 = "root.sg1.clsu.d3";
+ session.insertRecords(
+ Arrays.asList(deviceId1, deviceId2),
+ Arrays.asList(300L, 300L),
+ Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+ Arrays.asList(
+ Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+ Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32)),
+ Arrays.asList(Arrays.asList(true, 101), Arrays.asList(false, 201)));
+ session.insertRecords(
+ Arrays.asList(deviceId1, deviceId2),
+ Arrays.asList(200L, 200L),
+ Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+ Arrays.asList(Arrays.asList("false", "101"), Arrays.asList("true", "201")));
+ session.insertRecords(
+ Arrays.asList(deviceId1, deviceId2),
+ Arrays.asList(400L, 400L),
+ Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+ Arrays.asList(Arrays.asList(null, "102"), Arrays.asList("false", "202")));
+ session.insertRecords(
+ Arrays.asList(deviceId1, deviceId2),
+ Arrays.asList(500L, 500L),
+ Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+ Arrays.asList(
+ Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+ Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32)),
+ Arrays.asList(Arrays.asList(true, null), Arrays.asList(null, null)));
+ long nums = queryCountRecords("select count(s1) from " + deviceId1);
+ assertEquals(3, nums);
+ nums = queryCountRecords("select count(s2) from " + deviceId2);
+ assertEquals(3, nums);
+ }
+
+ private void prepareData() throws IoTDBConnectionException, StatementExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ session.setStorageGroup("root.sg1");
+ session.createTimeseries(
+ "root.sg1.clsu.d1.s1", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ session.createTimeseries(
+ "root.sg1.clsu.d1.s2", TSDataType.INT32, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ session.createTimeseries(
+ "root.sg1.clsu.d1.s3", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ session.createTimeseries(
+ "root.sg1.clsu.d1.s4", TSDataType.FLOAT, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ session.createTimeseries(
+ "root.sg1.clsu.d1.s5", TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ session.createTimeseries(
+ "root.sg1.clsu.d1.s6", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ session.createTimeseries(
+ "root.sg1.clsu.d2.s1", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
+ }
+
+ public long queryCountRecords(String sql)
+ throws StatementExecutionException, IoTDBConnectionException {
+ SessionDataSet dataSetWrapper = session.executeQueryStatement(sql, 1000);
+ long count = 0;
+ while (dataSetWrapper.hasNext()) {
+ RowRecord record = dataSetWrapper.next();
+ Field field = record.getFields().get(0);
+ switch (field.getDataType()) {
+ case INT32:
+ count = field.getIntV();
+ break;
+ case INT64:
+ count = field.getLongV();
+ break;
+ }
+ }
+ return count;
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/NoValidValueException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/NoValidValueException.java
new file mode 100644
index 0000000000..ce69ec1e27
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/NoValidValueException.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.rpc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+public class NoValidValueException extends RuntimeException {
+ public NoValidValueException() {
+ super();
+ }
+
+ public NoValidValueException(String message) {
+ super(message);
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 828540fe57..99fe43b1d8 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.session;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.NoValidValueException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
@@ -766,8 +767,20 @@ public class Session {
List<TSDataType> types,
Object... values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordReq request =
- genTSInsertRecordReq(deviceId, time, measurements, types, Arrays.asList(values), false);
+ TSInsertRecordReq request;
+ try {
+ request =
+ filterAndGenTSInsertRecordReq(
+ deviceId, time, measurements, types, Arrays.asList(values), false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceId,
+ time,
+ measurements.toString());
+ return;
+ }
+
insertRecord(deviceId, request);
}
@@ -915,8 +928,18 @@ public class Session {
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
// not vector by default
- TSInsertRecordReq request =
- genTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+ TSInsertRecordReq request;
+ try {
+ request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceId,
+ time,
+ measurements.toString());
+ return;
+ }
+
insertRecord(deviceId, request);
}
@@ -934,11 +957,41 @@ public class Session {
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordReq request =
- genTSInsertRecordReq(deviceId, time, measurements, types, values, true);
+ TSInsertRecordReq request;
+ try {
+ request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, true);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceId,
+ time,
+ measurements.toString());
+ return;
+ }
insertRecord(deviceId, request);
}
+ private TSInsertRecordReq filterAndGenTSInsertRecordReq(
+ String prefixPath,
+ long time,
+ List<String> measurements,
+ List<TSDataType> types,
+ List<Object> values,
+ boolean isAligned)
+ throws IoTDBConnectionException {
+ if (hasNull(values)) {
+ measurements = new ArrayList<>(measurements);
+ values = new ArrayList<>(values);
+ types = new ArrayList<>(types);
+ boolean isAllValuesNull =
+ filterNullValueAndMeasurement(prefixPath, measurements, types, values);
+ if (isAllValuesNull) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+ return genTSInsertRecordReq(prefixPath, time, measurements, types, values, isAligned);
+ }
+
private TSInsertRecordReq genTSInsertRecordReq(
String prefixPath,
long time,
@@ -967,8 +1020,17 @@ public class Session {
public void insertRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertStringRecordReq request =
- genTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+ TSInsertStringRecordReq request;
+ try {
+ request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceId,
+ time,
+ measurements.toString());
+ return;
+ }
insertRecord(deviceId, request);
}
@@ -982,11 +1044,38 @@ public class Session {
public void insertAlignedRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertStringRecordReq request =
- genTSInsertStringRecordReq(deviceId, time, measurements, values, true);
+ TSInsertStringRecordReq request;
+ try {
+ request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, true);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceId,
+ time,
+ measurements.toString());
+ return;
+ }
insertRecord(deviceId, request);
}
+ private TSInsertStringRecordReq filterAndGenTSInsertStringRecordReq(
+ String prefixPath,
+ long time,
+ List<String> measurements,
+ List<String> values,
+ boolean isAligned) {
+ if (hasNull(values)) {
+ measurements = new ArrayList<>(measurements);
+ values = new ArrayList<>(values);
+ boolean isAllValueNull =
+ filterNullValueAndMeasurementWithStringType(values, prefixPath, measurements);
+ if (isAllValueNull) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+ return genTSInsertStringRecordReq(prefixPath, time, measurements, values, isAligned);
+ }
+
private TSInsertStringRecordReq genTSInsertStringRecordReq(
String prefixPath,
long time,
@@ -1025,8 +1114,19 @@ public class Session {
if (enableCacheLeader) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, false);
} else {
- TSInsertStringRecordsReq request =
- genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, false);
+ TSInsertStringRecordsReq request;
+ try {
+ request =
+ filterAndGenTSInsertStringRecordsReq(
+ deviceIds, times, measurementsList, valuesList, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+ deviceIds.toString(),
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
@@ -1038,6 +1138,210 @@ public class Session {
}
}
+ /**
+ * When the value is null,filter this,don't use this measurement.
+ *
+ * @param times
+ * @param measurementsList
+ * @param valuesList
+ * @param typesList
+ */
+ private void filterNullValueAndMeasurement(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<Object>> valuesList,
+ List<List<TSDataType>> typesList) {
+ for (int i = valuesList.size() - 1; i >= 0; i--) {
+ List<Object> values = valuesList.get(i);
+ List<String> measurements = measurementsList.get(i);
+ List<TSDataType> types = typesList.get(i);
+ boolean isAllValuesNull =
+ filterNullValueAndMeasurement(deviceIds.get(i), measurements, types, values);
+ if (isAllValuesNull) {
+ valuesList.remove(i);
+ measurementsList.remove(i);
+ deviceIds.remove(i);
+ times.remove(i);
+ typesList.remove(i);
+ }
+ }
+ if (valuesList.size() == 0) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+
+ /**
+ * Filter the null value of list。
+ *
+ * @param deviceId
+ * @param times
+ * @param measurementsList
+ * @param typesList
+ * @param valuesList
+ */
+ private void filterNullValueAndMeasurementOfOneDevice(
+ String deviceId,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList) {
+ for (int i = valuesList.size() - 1; i >= 0; i--) {
+ List<Object> values = valuesList.get(i);
+ List<String> measurements = measurementsList.get(i);
+ List<TSDataType> types = typesList.get(i);
+ boolean isAllValuesNull =
+ filterNullValueAndMeasurement(deviceId, measurements, types, values);
+ if (isAllValuesNull) {
+ valuesList.remove(i);
+ measurementsList.remove(i);
+ typesList.remove(i);
+ times.remove(i);
+ }
+ }
+ if (valuesList.size() == 0) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+
+ /**
+ * Filter the null value of list。
+ *
+ * @param times
+ * @param deviceId
+ * @param measurementsList
+ * @param valuesList
+ */
+ private void filterNullValueAndMeasurementWithStringTypeOfOneDevice(
+ List<Long> times,
+ String deviceId,
+ List<List<String>> measurementsList,
+ List<List<String>> valuesList) {
+ for (int i = valuesList.size() - 1; i >= 0; i--) {
+ List<String> values = valuesList.get(i);
+ List<String> measurements = measurementsList.get(i);
+ boolean isAllValuesNull =
+ filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
+ if (isAllValuesNull) {
+ valuesList.remove(i);
+ measurementsList.remove(i);
+ times.remove(i);
+ }
+ }
+ if (valuesList.size() == 0) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+
+ /**
+ * Filter the null object of list。
+ *
+ * @param deviceId
+ * @param measurementsList
+ * @param types
+ * @param valuesList
+ * @return true:all value is null;false:not all null value is null.
+ */
+ private boolean filterNullValueAndMeasurement(
+ String deviceId,
+ List<String> measurementsList,
+ List<TSDataType> types,
+ List<Object> valuesList) {
+ Map<String, Object> nullMap = new HashMap<>();
+ for (int i = valuesList.size() - 1; i >= 0; i--) {
+ if (valuesList.get(i) == null) {
+ nullMap.put(measurementsList.get(i), valuesList.get(i));
+ valuesList.remove(i);
+ measurementsList.remove(i);
+ types.remove(i);
+ }
+ }
+ if (valuesList.size() == 0) {
+ logger.info("All values of the {} are null,null values are {}", deviceId, nullMap.toString());
+ return true;
+ } else {
+ logger.info("Some values of {} are null,null values are {}", deviceId, nullMap.toString());
+ }
+ return false;
+ }
+
+ /**
+ * Filter the null object of list。
+ *
+ * @param prefixPaths devices path。
+ * @param times
+ * @param measurementsList
+ * @param valuesList
+ * @return true:all values of valuesList are null;false:Not all values of valuesList are null.
+ */
+ private void filterNullValueAndMeasurementWithStringType(
+ List<String> prefixPaths,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<String>> valuesList) {
+ for (int i = valuesList.size() - 1; i >= 0; i--) {
+ List<String> values = valuesList.get(i);
+ List<String> measurements = measurementsList.get(i);
+ boolean isAllValueNull =
+ filterNullValueAndMeasurementWithStringType(values, prefixPaths.get(i), measurements);
+ if (isAllValueNull) {
+ valuesList.remove(i);
+ measurementsList.remove(i);
+ times.remove(i);
+ prefixPaths.remove(i);
+ }
+ }
+ if (valuesList.size() == 0) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+
+ /**
+ * When the value is null,filter this,don't use this measurement.
+ *
+ * @param valuesList
+ * @param measurementsList
+ * @return true:all value is null;false:not all null value is null.
+ */
+ private boolean filterNullValueAndMeasurementWithStringType(
+ List<String> valuesList, String deviceId, List<String> measurementsList) {
+ Map<String, Object> nullMap = new HashMap<>();
+ for (int i = valuesList.size() - 1; i >= 0; i--) {
+ if (valuesList.get(i) == null) {
+ nullMap.put(measurementsList.get(i), valuesList.get(i));
+ valuesList.remove(i);
+ measurementsList.remove(i);
+ }
+ }
+ if (valuesList.size() == 0) {
+ logger.info("All values of the {} are null,null values are {}", deviceId, nullMap.toString());
+ return true;
+ } else {
+ logger.info("Some values of {} are null,null values are {}", deviceId, nullMap.toString());
+ }
+ return false;
+ }
+
+ private boolean hasNull(List valuesList) {
+ boolean haveNull = false;
+ for (int i1 = 0; i1 < valuesList.size(); i1++) {
+ Object o = valuesList.get(i1);
+ if (o instanceof List) {
+ List o1 = (List) o;
+ if (hasNull(o1)) {
+ haveNull = true;
+ break;
+ }
+ } else {
+ if (o == null) {
+ haveNull = true;
+ break;
+ }
+ }
+ }
+ return haveNull;
+ }
+
/**
* Insert aligned multiple rows, which can reduce the overhead of network. This method is just
* like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
@@ -1062,8 +1366,20 @@ public class Session {
if (enableCacheLeader) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, true);
} else {
- TSInsertStringRecordsReq request =
- genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, true);
+ TSInsertStringRecordsReq request;
+ try {
+ request =
+ filterAndGenTSInsertStringRecordsReq(
+ deviceIds, times, measurementsList, valuesList, true);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+ deviceIds.toString(),
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
+
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
@@ -1088,13 +1404,77 @@ public class Session {
TSInsertStringRecordsReq request =
recordsGroup.computeIfAbsent(connection, k -> new TSInsertStringRecordsReq());
request.setIsAligned(isAligned);
- updateTSInsertStringRecordsReq(
- request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
+ try {
+ filterAndUpdateTSInsertStringRecordsReq(
+ request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceIds.get(i),
+ times.get(i),
+ measurementsList.get(i).toString());
+ continue;
+ }
}
insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
+ private TSInsertStringRecordsReq filterAndGenTSInsertStringRecordsReq(
+ List<String> prefixPaths,
+ List<Long> time,
+ List<List<String>> measurements,
+ List<List<String>> values,
+ boolean isAligned) {
+ if (hasNull(values)) {
+ values = changeToArrayListWithStringType(values);
+ measurements = changeToArrayListWithStringType(measurements);
+ prefixPaths = new ArrayList<>(prefixPaths);
+ time = new ArrayList<>(time);
+ filterNullValueAndMeasurementWithStringType(prefixPaths, time, measurements, values);
+ }
+ return genTSInsertStringRecordsReq(prefixPaths, time, measurements, values, isAligned);
+ }
+
+ private List<List<String>> changeToArrayListWithStringType(List<List<String>> values) {
+ if (!(values instanceof ArrayList)) {
+ values = new ArrayList<>(values);
+ }
+ for (int i = 0; i < values.size(); i++) {
+ List<String> currentValue = values.get(i);
+ if (!(currentValue instanceof ArrayList)) {
+ values.set(i, new ArrayList<>(currentValue));
+ }
+ }
+ return values;
+ }
+
+ private List<List<Object>> changeToArrayList(List<List<Object>> values) {
+ if (!(values instanceof ArrayList)) {
+ values = new ArrayList<>(values);
+ }
+ for (int i = 0; i < values.size(); i++) {
+ List<Object> currentValue = values.get(i);
+ if (!(currentValue instanceof ArrayList)) {
+ values.set(i, new ArrayList<>(currentValue));
+ }
+ }
+ return values;
+ }
+
+ private List<List<TSDataType>> changeToArrayListWithTSDataType(List<List<TSDataType>> values) {
+ if (!(values instanceof ArrayList)) {
+ values = new ArrayList<>(values);
+ }
+ for (int i = 0; i < values.size(); i++) {
+ List<TSDataType> currentValue = values.get(i);
+ if (!(currentValue instanceof ArrayList)) {
+ values.set(i, new ArrayList<>(currentValue));
+ }
+ }
+ return values;
+ }
+
private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
List<String> prefixPaths,
List<Long> time,
@@ -1102,7 +1482,6 @@ public class Session {
List<List<String>> values,
boolean isAligned) {
TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
-
request.setPrefixPaths(prefixPaths);
request.setTimestamps(time);
request.setMeasurementsList(measurements);
@@ -1111,6 +1490,24 @@ public class Session {
return request;
}
+ private void filterAndUpdateTSInsertStringRecordsReq(
+ TSInsertStringRecordsReq request,
+ String deviceId,
+ long time,
+ List<String> measurements,
+ List<String> values) {
+ if (hasNull(values)) {
+ measurements = new ArrayList<>(measurements);
+ values = new ArrayList<>(values);
+ boolean isAllValueNull =
+ filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
+ if (isAllValueNull) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+ updateTSInsertStringRecordsReq(request, deviceId, time, measurements, values);
+ }
+
private void updateTSInsertStringRecordsReq(
TSInsertStringRecordsReq request,
String deviceId,
@@ -1148,8 +1545,19 @@ public class Session {
insertRecordsWithLeaderCache(
deviceIds, times, measurementsList, typesList, valuesList, false);
} else {
- TSInsertRecordsReq request =
- genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, false);
+ TSInsertRecordsReq request;
+ try {
+ request =
+ filterAndGenTSInsertRecordsReq(
+ deviceIds, times, measurementsList, typesList, valuesList, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+ deviceIds.toString(),
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
@@ -1186,8 +1594,19 @@ public class Session {
if (enableCacheLeader) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, true);
} else {
- TSInsertRecordsReq request =
- genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, true);
+ TSInsertRecordsReq request;
+ try {
+ request =
+ filterAndGenTSInsertRecordsReq(
+ deviceIds, times, measurementsList, typesList, valuesList, true);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+ deviceIds.toString(),
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
@@ -1241,9 +1660,19 @@ public class Session {
throw new IllegalArgumentException(
"times, measurementsList and valuesList's size should be equal");
}
- TSInsertRecordsOfOneDeviceReq request =
- genTSInsertRecordsOfOneDeviceReq(
- deviceId, times, measurementsList, typesList, valuesList, haveSorted, false);
+ TSInsertRecordsOfOneDeviceReq request;
+ try {
+ request =
+ filterAndGenTSInsertRecordsOfOneDeviceReq(
+ deviceId, times, measurementsList, typesList, valuesList, haveSorted, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+ deviceId,
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
@@ -1273,9 +1702,19 @@ public class Session {
throw new IllegalArgumentException(
"times, measurementsList and valuesList's size should be equal");
}
- TSInsertStringRecordsOfOneDeviceReq req =
- genTSInsertStringRecordsOfOneDeviceReq(
- deviceId, times, measurementsList, valuesList, haveSorted, false);
+ TSInsertStringRecordsOfOneDeviceReq req;
+ try {
+ req =
+ filterAndGenTSInsertStringRecordsOfOneDeviceReq(
+ deviceId, times, measurementsList, valuesList, haveSorted, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+ deviceId,
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
try {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
@@ -1343,9 +1782,19 @@ public class Session {
throw new IllegalArgumentException(
"times, subMeasurementsList and valuesList's size should be equal");
}
- TSInsertRecordsOfOneDeviceReq request =
- genTSInsertRecordsOfOneDeviceReq(
- deviceId, times, measurementsList, typesList, valuesList, haveSorted, true);
+ TSInsertRecordsOfOneDeviceReq request;
+ try {
+ request =
+ filterAndGenTSInsertRecordsOfOneDeviceReq(
+ deviceId, times, measurementsList, typesList, valuesList, haveSorted, true);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+ deviceId,
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
@@ -1375,9 +1824,19 @@ public class Session {
throw new IllegalArgumentException(
"times, measurementsList and valuesList's size should be equal");
}
- TSInsertStringRecordsOfOneDeviceReq req =
- genTSInsertStringRecordsOfOneDeviceReq(
- deviceId, times, measurementsList, valuesList, haveSorted, true);
+ TSInsertStringRecordsOfOneDeviceReq req;
+ try {
+ req =
+ filterAndGenTSInsertStringRecordsOfOneDeviceReq(
+ deviceId, times, measurementsList, valuesList, haveSorted, true);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+ deviceId,
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
try {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
@@ -1404,6 +1863,27 @@ public class Session {
insertAlignedStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList, false);
}
+ private TSInsertRecordsOfOneDeviceReq filterAndGenTSInsertRecordsOfOneDeviceReq(
+ String prefixPath,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ boolean haveSorted,
+ boolean isAligned)
+ throws IoTDBConnectionException, BatchExecutionException {
+ if (hasNull(valuesList)) {
+ measurementsList = changeToArrayListWithStringType(measurementsList);
+ valuesList = changeToArrayList(valuesList);
+ typesList = changeToArrayListWithTSDataType(typesList);
+ times = new ArrayList<>(times);
+ filterNullValueAndMeasurementOfOneDevice(
+ prefixPath, times, measurementsList, typesList, valuesList);
+ }
+ return genTSInsertRecordsOfOneDeviceReq(
+ prefixPath, times, measurementsList, typesList, valuesList, haveSorted, isAligned);
+ }
+
private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(
String prefixPath,
List<Long> times,
@@ -1419,7 +1899,6 @@ public class Session {
throw new IllegalArgumentException(
"times, measurementsList and valuesList's size should be equal");
}
-
if (!checkSorted(times)) {
// sort
Integer[] index = new Integer[times.size()];
@@ -1446,6 +1925,24 @@ public class Session {
return request;
}
+ private TSInsertStringRecordsOfOneDeviceReq filterAndGenTSInsertStringRecordsOfOneDeviceReq(
+ String prefixPath,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<String>> valuesList,
+ boolean haveSorted,
+ boolean isAligned) {
+ if (hasNull(valuesList)) {
+ measurementsList = changeToArrayListWithStringType(measurementsList);
+ valuesList = changeToArrayListWithStringType(valuesList);
+ times = new ArrayList<>(times);
+ filterNullValueAndMeasurementWithStringTypeOfOneDevice(
+ times, prefixPath, measurementsList, valuesList);
+ }
+ return genTSInsertStringRecordsOfOneDeviceReq(
+ prefixPath, times, measurementsList, valuesList, haveSorted, isAligned);
+ }
+
private TSInsertStringRecordsOfOneDeviceReq genTSInsertStringRecordsOfOneDeviceReq(
String prefixPath,
List<Long> times,
@@ -1521,17 +2018,46 @@ public class Session {
TSInsertRecordsReq request =
recordsGroup.computeIfAbsent(connection, k -> new TSInsertRecordsReq());
request.setIsAligned(isAligned);
- updateTSInsertRecordsReq(
- request,
- deviceIds.get(i),
- times.get(i),
- measurementsList.get(i),
- typesList.get(i),
- valuesList.get(i));
+ try {
+ filterAndUpdateTSInsertRecordsReq(
+ request,
+ deviceIds.get(i),
+ times.get(i),
+ measurementsList.get(i),
+ typesList.get(i),
+ valuesList.get(i));
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements are [{}]",
+ deviceIds.get(i),
+ times.get(i),
+ measurementsList.get(i).toString());
+ continue;
+ }
}
insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
+ private TSInsertRecordsReq filterAndGenTSInsertRecordsReq(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ boolean isAligned)
+ throws IoTDBConnectionException {
+ if (hasNull(valuesList)) {
+ measurementsList = changeToArrayListWithStringType(measurementsList);
+ valuesList = changeToArrayList(valuesList);
+ deviceIds = new ArrayList<>(deviceIds);
+ times = new ArrayList<>(times);
+ typesList = changeToArrayListWithTSDataType(typesList);
+ filterNullValueAndMeasurement(deviceIds, times, measurementsList, valuesList, typesList);
+ }
+ return genTSInsertRecordsReq(
+ deviceIds, times, measurementsList, typesList, valuesList, isAligned);
+ }
+
private TSInsertRecordsReq genTSInsertRecordsReq(
List<String> deviceIds,
List<Long> times,
@@ -1550,6 +2076,27 @@ public class Session {
return request;
}
+ private void filterAndUpdateTSInsertRecordsReq(
+ TSInsertRecordsReq request,
+ String deviceId,
+ Long time,
+ List<String> measurements,
+ List<TSDataType> types,
+ List<Object> values)
+ throws IoTDBConnectionException {
+ if (hasNull(values)) {
+ measurements = new ArrayList<>(measurements);
+ types = new ArrayList<>(types);
+ values = new ArrayList<>(values);
+ boolean isAllValuesNull =
+ filterNullValueAndMeasurement(deviceId, measurements, types, values);
+ if (isAllValuesNull) {
+ throw new NoValidValueException("All inserted data is null.");
+ }
+ }
+ updateTSInsertRecordsReq(request, deviceId, time, measurements, types, values);
+ }
+
private void updateTSInsertRecordsReq(
TSInsertRecordsReq request,
String deviceId,
@@ -1819,8 +2366,20 @@ public class Session {
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertStringRecordsReq request =
- genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, false);
+ TSInsertStringRecordsReq request;
+ try {
+ request =
+ filterAndGenTSInsertStringRecordsReq(
+ deviceIds, times, measurementsList, valuesList, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+ deviceIds.toString(),
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
+
defaultSessionConnection.testInsertRecords(request);
}
@@ -1835,8 +2394,20 @@ public class Session {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordsReq request =
- genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, false);
+ TSInsertRecordsReq request;
+ try {
+ request =
+ filterAndGenTSInsertRecordsReq(
+ deviceIds, times, measurementsList, typesList, valuesList, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+ deviceIds.toString(),
+ times.toString(),
+ measurementsList.toString());
+ return;
+ }
+
defaultSessionConnection.testInsertRecords(request);
}
@@ -1847,8 +2418,17 @@ public class Session {
public void testInsertRecord(
String deviceId, long time, List<String> measurements, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertStringRecordReq request =
- genTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+ TSInsertStringRecordReq request;
+ try {
+ request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceId,
+ time,
+ measurements.toString());
+ return;
+ }
defaultSessionConnection.testInsertRecord(request);
}
@@ -1863,8 +2443,17 @@ public class Session {
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordReq request =
- genTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+ TSInsertRecordReq request;
+ try {
+ request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+ deviceId,
+ time,
+ measurements.toString());
+ return;
+ }
defaultSessionConnection.testInsertRecord(request);
}