You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/09 05:08:50 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3861]Adds null value handling logic to the Session API (#6857)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new af935518e9 [To rel/0.13][IOTDB-3861]Adds null value handling logic to the Session API (#6857)
af935518e9 is described below

commit af935518e9bff97a814236ffa9faf82f93c3d980
Author: suchenglong <40...@qq.com>
AuthorDate: Tue Aug 9 13:08:44 2022 +0800

    [To rel/0.13][IOTDB-3861]Adds null value handling logic to the Session API (#6857)
---
 .../iotdb/session/IoTDBSessionInsertNullT.java     | 163 +++++
 .../apache/iotdb/rpc/NoValidValueException.java    |  29 +
 .../java/org/apache/iotdb/session/Session.java     | 687 +++++++++++++++++++--
 3 files changed, 830 insertions(+), 49 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionInsertNullT.java b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionInsertNullT.java
new file mode 100644
index 0000000000..78c5fd1017
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/session/IoTDBSessionInsertNullT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBSessionInsertNullT {
+  private Session session;
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvironmentUtils.envSetUp();
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    session.close();
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testInsertRecordNull() throws StatementExecutionException, IoTDBConnectionException {
+    String deviceId = "root.sg1.clsu.d1";
+    session.insertRecord(deviceId, 100, Arrays.asList("s1"), Arrays.asList("true"));
+    List<String> t = new ArrayList<>();
+    t.add(null);
+    session.insertRecord(deviceId, 200, Arrays.asList("s1"), t);
+    session.insertRecord(
+        deviceId,
+        300,
+        Arrays.asList("s1", "s2"),
+        Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+        Arrays.asList(true, 30));
+    session.insertRecord(
+        deviceId,
+        400,
+        Arrays.asList("s1", "s2"),
+        Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+        Arrays.asList(true, null));
+    session.insertRecord(
+        deviceId,
+        500,
+        Arrays.asList("s1", "s2"),
+        Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+        Arrays.asList(null, null));
+    long nums = queryCountRecords("select count(s1) from " + deviceId);
+    assertEquals(3, nums);
+  }
+
+  @Test
+  public void testInsertRecordsNull() throws StatementExecutionException, IoTDBConnectionException {
+    String deviceId1 = "root.sg1.clsu.d2";
+    String deviceId2 = "root.sg1.clsu.d3";
+    session.insertRecords(
+        Arrays.asList(deviceId1, deviceId2),
+        Arrays.asList(300L, 300L),
+        Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+        Arrays.asList(
+            Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+            Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32)),
+        Arrays.asList(Arrays.asList(true, 101), Arrays.asList(false, 201)));
+    session.insertRecords(
+        Arrays.asList(deviceId1, deviceId2),
+        Arrays.asList(200L, 200L),
+        Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+        Arrays.asList(Arrays.asList("false", "101"), Arrays.asList("true", "201")));
+    session.insertRecords(
+        Arrays.asList(deviceId1, deviceId2),
+        Arrays.asList(400L, 400L),
+        Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+        Arrays.asList(Arrays.asList(null, "102"), Arrays.asList("false", "202")));
+    session.insertRecords(
+        Arrays.asList(deviceId1, deviceId2),
+        Arrays.asList(500L, 500L),
+        Arrays.asList(Arrays.asList("s1", "s2"), Arrays.asList("s1", "s2")),
+        Arrays.asList(
+            Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32),
+            Arrays.asList(TSDataType.BOOLEAN, TSDataType.INT32)),
+        Arrays.asList(Arrays.asList(true, null), Arrays.asList(null, null)));
+    long nums = queryCountRecords("select count(s1) from " + deviceId1);
+    assertEquals(3, nums);
+    nums = queryCountRecords("select count(s2) from " + deviceId2);
+    assertEquals(3, nums);
+  }
+
+  private void prepareData() throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    session.setStorageGroup("root.sg1");
+    session.createTimeseries(
+        "root.sg1.clsu.d1.s1", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    session.createTimeseries(
+        "root.sg1.clsu.d1.s2", TSDataType.INT32, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    session.createTimeseries(
+        "root.sg1.clsu.d1.s3", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    session.createTimeseries(
+        "root.sg1.clsu.d1.s4", TSDataType.FLOAT, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    session.createTimeseries(
+        "root.sg1.clsu.d1.s5", TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    session.createTimeseries(
+        "root.sg1.clsu.d1.s6", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    session.createTimeseries(
+        "root.sg1.clsu.d2.s1", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
+  }
+
+  public long queryCountRecords(String sql)
+      throws StatementExecutionException, IoTDBConnectionException {
+    SessionDataSet dataSetWrapper = session.executeQueryStatement(sql, 1000);
+    long count = 0;
+    while (dataSetWrapper.hasNext()) {
+      RowRecord record = dataSetWrapper.next();
+      Field field = record.getFields().get(0);
+      switch (field.getDataType()) {
+        case INT32:
+          count = field.getIntV();
+          break;
+        case INT64:
+          count = field.getLongV();
+          break;
+      }
+    }
+    return count;
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/NoValidValueException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/NoValidValueException.java
new file mode 100644
index 0000000000..ce69ec1e27
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/NoValidValueException.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.rpc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+public class NoValidValueException extends RuntimeException {
+  public NoValidValueException() {
+    super();
+  }
+
+  public NoValidValueException(String message) {
+    super(message);
+  }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 828540fe57..99fe43b1d8 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.session;
 
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.NoValidValueException;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
@@ -766,8 +767,20 @@ public class Session {
       List<TSDataType> types,
       Object... values)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertRecordReq request =
-        genTSInsertRecordReq(deviceId, time, measurements, types, Arrays.asList(values), false);
+    TSInsertRecordReq request;
+    try {
+      request =
+          filterAndGenTSInsertRecordReq(
+              deviceId, time, measurements, types, Arrays.asList(values), false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+          deviceId,
+          time,
+          measurements.toString());
+      return;
+    }
+
     insertRecord(deviceId, request);
   }
 
@@ -915,8 +928,18 @@ public class Session {
       List<Object> values)
       throws IoTDBConnectionException, StatementExecutionException {
     // not vector by default
-    TSInsertRecordReq request =
-        genTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+    TSInsertRecordReq request;
+    try {
+      request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+          deviceId,
+          time,
+          measurements.toString());
+      return;
+    }
+
     insertRecord(deviceId, request);
   }
 
@@ -934,11 +957,41 @@ public class Session {
       List<TSDataType> types,
       List<Object> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertRecordReq request =
-        genTSInsertRecordReq(deviceId, time, measurements, types, values, true);
+    TSInsertRecordReq request;
+    try {
+      request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, true);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+          deviceId,
+          time,
+          measurements.toString());
+      return;
+    }
     insertRecord(deviceId, request);
   }
 
+  private TSInsertRecordReq filterAndGenTSInsertRecordReq(
+      String prefixPath,
+      long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      List<Object> values,
+      boolean isAligned)
+      throws IoTDBConnectionException {
+    if (hasNull(values)) {
+      measurements = new ArrayList<>(measurements);
+      values = new ArrayList<>(values);
+      types = new ArrayList<>(types);
+      boolean isAllValuesNull =
+          filterNullValueAndMeasurement(prefixPath, measurements, types, values);
+      if (isAllValuesNull) {
+        throw new NoValidValueException("All inserted data is null.");
+      }
+    }
+    return genTSInsertRecordReq(prefixPath, time, measurements, types, values, isAligned);
+  }
+
   private TSInsertRecordReq genTSInsertRecordReq(
       String prefixPath,
       long time,
@@ -967,8 +1020,17 @@ public class Session {
   public void insertRecord(
       String deviceId, long time, List<String> measurements, List<String> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertStringRecordReq request =
-        genTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+    TSInsertStringRecordReq request;
+    try {
+      request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+          deviceId,
+          time,
+          measurements.toString());
+      return;
+    }
     insertRecord(deviceId, request);
   }
 
@@ -982,11 +1044,38 @@ public class Session {
   public void insertAlignedRecord(
       String deviceId, long time, List<String> measurements, List<String> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertStringRecordReq request =
-        genTSInsertStringRecordReq(deviceId, time, measurements, values, true);
+    TSInsertStringRecordReq request;
+    try {
+      request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, true);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+          deviceId,
+          time,
+          measurements.toString());
+      return;
+    }
     insertRecord(deviceId, request);
   }
 
+  private TSInsertStringRecordReq filterAndGenTSInsertStringRecordReq(
+      String prefixPath,
+      long time,
+      List<String> measurements,
+      List<String> values,
+      boolean isAligned) {
+    if (hasNull(values)) {
+      measurements = new ArrayList<>(measurements);
+      values = new ArrayList<>(values);
+      boolean isAllValueNull =
+          filterNullValueAndMeasurementWithStringType(values, prefixPath, measurements);
+      if (isAllValueNull) {
+        throw new NoValidValueException("All inserted data is null.");
+      }
+    }
+    return genTSInsertStringRecordReq(prefixPath, time, measurements, values, isAligned);
+  }
+
   private TSInsertStringRecordReq genTSInsertStringRecordReq(
       String prefixPath,
       long time,
@@ -1025,8 +1114,19 @@ public class Session {
     if (enableCacheLeader) {
       insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, false);
     } else {
-      TSInsertStringRecordsReq request =
-          genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, false);
+      TSInsertStringRecordsReq request;
+      try {
+        request =
+            filterAndGenTSInsertStringRecordsReq(
+                deviceIds, times, measurementsList, valuesList, false);
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+            deviceIds.toString(),
+            times.toString(),
+            measurementsList.toString());
+        return;
+      }
       try {
         defaultSessionConnection.insertRecords(request);
       } catch (RedirectException e) {
@@ -1038,6 +1138,210 @@ public class Session {
     }
   }
 
+  /**
+   * When the value is null,filter this,don't use this measurement.
+   *
+   * @param times
+   * @param measurementsList
+   * @param valuesList
+   * @param typesList
+   */
+  private void filterNullValueAndMeasurement(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<Object>> valuesList,
+      List<List<TSDataType>> typesList) {
+    for (int i = valuesList.size() - 1; i >= 0; i--) {
+      List<Object> values = valuesList.get(i);
+      List<String> measurements = measurementsList.get(i);
+      List<TSDataType> types = typesList.get(i);
+      boolean isAllValuesNull =
+          filterNullValueAndMeasurement(deviceIds.get(i), measurements, types, values);
+      if (isAllValuesNull) {
+        valuesList.remove(i);
+        measurementsList.remove(i);
+        deviceIds.remove(i);
+        times.remove(i);
+        typesList.remove(i);
+      }
+    }
+    if (valuesList.size() == 0) {
+      throw new NoValidValueException("All inserted data is null.");
+    }
+  }
+
+  /**
+   * Filter the null value of list。
+   *
+   * @param deviceId
+   * @param times
+   * @param measurementsList
+   * @param typesList
+   * @param valuesList
+   */
+  private void filterNullValueAndMeasurementOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) {
+    for (int i = valuesList.size() - 1; i >= 0; i--) {
+      List<Object> values = valuesList.get(i);
+      List<String> measurements = measurementsList.get(i);
+      List<TSDataType> types = typesList.get(i);
+      boolean isAllValuesNull =
+          filterNullValueAndMeasurement(deviceId, measurements, types, values);
+      if (isAllValuesNull) {
+        valuesList.remove(i);
+        measurementsList.remove(i);
+        typesList.remove(i);
+        times.remove(i);
+      }
+    }
+    if (valuesList.size() == 0) {
+      throw new NoValidValueException("All inserted data is null.");
+    }
+  }
+
+  /**
+   * Filter the null value of list。
+   *
+   * @param times
+   * @param deviceId
+   * @param measurementsList
+   * @param valuesList
+   */
+  private void filterNullValueAndMeasurementWithStringTypeOfOneDevice(
+      List<Long> times,
+      String deviceId,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList) {
+    for (int i = valuesList.size() - 1; i >= 0; i--) {
+      List<String> values = valuesList.get(i);
+      List<String> measurements = measurementsList.get(i);
+      boolean isAllValuesNull =
+          filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
+      if (isAllValuesNull) {
+        valuesList.remove(i);
+        measurementsList.remove(i);
+        times.remove(i);
+      }
+    }
+    if (valuesList.size() == 0) {
+      throw new NoValidValueException("All inserted data is null.");
+    }
+  }
+
+  /**
+   * Filter the null object of list。
+   *
+   * @param deviceId
+   * @param measurementsList
+   * @param types
+   * @param valuesList
+   * @return true:all value is null;false:not all null value is null.
+   */
+  private boolean filterNullValueAndMeasurement(
+      String deviceId,
+      List<String> measurementsList,
+      List<TSDataType> types,
+      List<Object> valuesList) {
+    Map<String, Object> nullMap = new HashMap<>();
+    for (int i = valuesList.size() - 1; i >= 0; i--) {
+      if (valuesList.get(i) == null) {
+        nullMap.put(measurementsList.get(i), valuesList.get(i));
+        valuesList.remove(i);
+        measurementsList.remove(i);
+        types.remove(i);
+      }
+    }
+    if (valuesList.size() == 0) {
+      logger.info("All values of the {} are null,null values are {}", deviceId, nullMap.toString());
+      return true;
+    } else {
+      logger.info("Some values of {} are null,null values are {}", deviceId, nullMap.toString());
+    }
+    return false;
+  }
+
+  /**
+   * Filter the null object of list。
+   *
+   * @param prefixPaths devices path。
+   * @param times
+   * @param measurementsList
+   * @param valuesList
+   * @return true:all values of valuesList are null;false:Not all values of valuesList are null.
+   */
+  private void filterNullValueAndMeasurementWithStringType(
+      List<String> prefixPaths,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList) {
+    for (int i = valuesList.size() - 1; i >= 0; i--) {
+      List<String> values = valuesList.get(i);
+      List<String> measurements = measurementsList.get(i);
+      boolean isAllValueNull =
+          filterNullValueAndMeasurementWithStringType(values, prefixPaths.get(i), measurements);
+      if (isAllValueNull) {
+        valuesList.remove(i);
+        measurementsList.remove(i);
+        times.remove(i);
+        prefixPaths.remove(i);
+      }
+    }
+    if (valuesList.size() == 0) {
+      throw new NoValidValueException("All inserted data is null.");
+    }
+  }
+
+  /**
+   * When the value is null,filter this,don't use this measurement.
+   *
+   * @param valuesList
+   * @param measurementsList
+   * @return true:all value is null;false:not all null value is null.
+   */
+  private boolean filterNullValueAndMeasurementWithStringType(
+      List<String> valuesList, String deviceId, List<String> measurementsList) {
+    Map<String, Object> nullMap = new HashMap<>();
+    for (int i = valuesList.size() - 1; i >= 0; i--) {
+      if (valuesList.get(i) == null) {
+        nullMap.put(measurementsList.get(i), valuesList.get(i));
+        valuesList.remove(i);
+        measurementsList.remove(i);
+      }
+    }
+    if (valuesList.size() == 0) {
+      logger.info("All values of the {} are null,null values are {}", deviceId, nullMap.toString());
+      return true;
+    } else {
+      logger.info("Some values of {} are null,null values are {}", deviceId, nullMap.toString());
+    }
+    return false;
+  }
+
+  private boolean hasNull(List valuesList) {
+    boolean haveNull = false;
+    for (int i1 = 0; i1 < valuesList.size(); i1++) {
+      Object o = valuesList.get(i1);
+      if (o instanceof List) {
+        List o1 = (List) o;
+        if (hasNull(o1)) {
+          haveNull = true;
+          break;
+        }
+      } else {
+        if (o == null) {
+          haveNull = true;
+          break;
+        }
+      }
+    }
+    return haveNull;
+  }
+
   /**
    * Insert aligned multiple rows, which can reduce the overhead of network. This method is just
    * like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
@@ -1062,8 +1366,20 @@ public class Session {
     if (enableCacheLeader) {
       insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, true);
     } else {
-      TSInsertStringRecordsReq request =
-          genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, true);
+      TSInsertStringRecordsReq request;
+      try {
+        request =
+            filterAndGenTSInsertStringRecordsReq(
+                deviceIds, times, measurementsList, valuesList, true);
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+            deviceIds.toString(),
+            times.toString(),
+            measurementsList.toString());
+        return;
+      }
+
       try {
         defaultSessionConnection.insertRecords(request);
       } catch (RedirectException e) {
@@ -1088,13 +1404,77 @@ public class Session {
       TSInsertStringRecordsReq request =
           recordsGroup.computeIfAbsent(connection, k -> new TSInsertStringRecordsReq());
       request.setIsAligned(isAligned);
-      updateTSInsertStringRecordsReq(
-          request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
+      try {
+        filterAndUpdateTSInsertStringRecordsReq(
+            request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+            deviceIds.get(i),
+            times.get(i),
+            measurementsList.get(i).toString());
+        continue;
+      }
     }
 
     insertByGroup(recordsGroup, SessionConnection::insertRecords);
   }
 
+  private TSInsertStringRecordsReq filterAndGenTSInsertStringRecordsReq(
+      List<String> prefixPaths,
+      List<Long> time,
+      List<List<String>> measurements,
+      List<List<String>> values,
+      boolean isAligned) {
+    if (hasNull(values)) {
+      values = changeToArrayListWithStringType(values);
+      measurements = changeToArrayListWithStringType(measurements);
+      prefixPaths = new ArrayList<>(prefixPaths);
+      time = new ArrayList<>(time);
+      filterNullValueAndMeasurementWithStringType(prefixPaths, time, measurements, values);
+    }
+    return genTSInsertStringRecordsReq(prefixPaths, time, measurements, values, isAligned);
+  }
+
+  private List<List<String>> changeToArrayListWithStringType(List<List<String>> values) {
+    if (!(values instanceof ArrayList)) {
+      values = new ArrayList<>(values);
+    }
+    for (int i = 0; i < values.size(); i++) {
+      List<String> currentValue = values.get(i);
+      if (!(currentValue instanceof ArrayList)) {
+        values.set(i, new ArrayList<>(currentValue));
+      }
+    }
+    return values;
+  }
+
+  private List<List<Object>> changeToArrayList(List<List<Object>> values) {
+    if (!(values instanceof ArrayList)) {
+      values = new ArrayList<>(values);
+    }
+    for (int i = 0; i < values.size(); i++) {
+      List<Object> currentValue = values.get(i);
+      if (!(currentValue instanceof ArrayList)) {
+        values.set(i, new ArrayList<>(currentValue));
+      }
+    }
+    return values;
+  }
+
+  private List<List<TSDataType>> changeToArrayListWithTSDataType(List<List<TSDataType>> values) {
+    if (!(values instanceof ArrayList)) {
+      values = new ArrayList<>(values);
+    }
+    for (int i = 0; i < values.size(); i++) {
+      List<TSDataType> currentValue = values.get(i);
+      if (!(currentValue instanceof ArrayList)) {
+        values.set(i, new ArrayList<>(currentValue));
+      }
+    }
+    return values;
+  }
+
   private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
       List<String> prefixPaths,
       List<Long> time,
@@ -1102,7 +1482,6 @@ public class Session {
       List<List<String>> values,
       boolean isAligned) {
     TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
-
     request.setPrefixPaths(prefixPaths);
     request.setTimestamps(time);
     request.setMeasurementsList(measurements);
@@ -1111,6 +1490,24 @@ public class Session {
     return request;
   }
 
+  private void filterAndUpdateTSInsertStringRecordsReq(
+      TSInsertStringRecordsReq request,
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<String> values) {
+    if (hasNull(values)) {
+      measurements = new ArrayList<>(measurements);
+      values = new ArrayList<>(values);
+      boolean isAllValueNull =
+          filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
+      if (isAllValueNull) {
+        throw new NoValidValueException("All inserted data is null.");
+      }
+    }
+    updateTSInsertStringRecordsReq(request, deviceId, time, measurements, values);
+  }
+
   private void updateTSInsertStringRecordsReq(
       TSInsertStringRecordsReq request,
       String deviceId,
@@ -1148,8 +1545,19 @@ public class Session {
       insertRecordsWithLeaderCache(
           deviceIds, times, measurementsList, typesList, valuesList, false);
     } else {
-      TSInsertRecordsReq request =
-          genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, false);
+      TSInsertRecordsReq request;
+      try {
+        request =
+            filterAndGenTSInsertRecordsReq(
+                deviceIds, times, measurementsList, typesList, valuesList, false);
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+            deviceIds.toString(),
+            times.toString(),
+            measurementsList.toString());
+        return;
+      }
       try {
         defaultSessionConnection.insertRecords(request);
       } catch (RedirectException e) {
@@ -1186,8 +1594,19 @@ public class Session {
     if (enableCacheLeader) {
       insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, true);
     } else {
-      TSInsertRecordsReq request =
-          genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, true);
+      TSInsertRecordsReq request;
+      try {
+        request =
+            filterAndGenTSInsertRecordsReq(
+                deviceIds, times, measurementsList, typesList, valuesList, true);
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+            deviceIds.toString(),
+            times.toString(),
+            measurementsList.toString());
+        return;
+      }
       try {
         defaultSessionConnection.insertRecords(request);
       } catch (RedirectException e) {
@@ -1241,9 +1660,19 @@ public class Session {
       throw new IllegalArgumentException(
           "times, measurementsList and valuesList's size should be equal");
     }
-    TSInsertRecordsOfOneDeviceReq request =
-        genTSInsertRecordsOfOneDeviceReq(
-            deviceId, times, measurementsList, typesList, valuesList, haveSorted, false);
+    TSInsertRecordsOfOneDeviceReq request;
+    try {
+      request =
+          filterAndGenTSInsertRecordsOfOneDeviceReq(
+              deviceId, times, measurementsList, typesList, valuesList, haveSorted, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+          deviceId,
+          times.toString(),
+          measurementsList.toString());
+      return;
+    }
     try {
       getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
     } catch (RedirectException e) {
@@ -1273,9 +1702,19 @@ public class Session {
       throw new IllegalArgumentException(
           "times, measurementsList and valuesList's size should be equal");
     }
-    TSInsertStringRecordsOfOneDeviceReq req =
-        genTSInsertStringRecordsOfOneDeviceReq(
-            deviceId, times, measurementsList, valuesList, haveSorted, false);
+    TSInsertStringRecordsOfOneDeviceReq req;
+    try {
+      req =
+          filterAndGenTSInsertStringRecordsOfOneDeviceReq(
+              deviceId, times, measurementsList, valuesList, haveSorted, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+          deviceId,
+          times.toString(),
+          measurementsList.toString());
+      return;
+    }
     try {
       getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
     } catch (RedirectException e) {
@@ -1343,9 +1782,19 @@ public class Session {
       throw new IllegalArgumentException(
           "times, subMeasurementsList and valuesList's size should be equal");
     }
-    TSInsertRecordsOfOneDeviceReq request =
-        genTSInsertRecordsOfOneDeviceReq(
-            deviceId, times, measurementsList, typesList, valuesList, haveSorted, true);
+    TSInsertRecordsOfOneDeviceReq request;
+    try {
+      request =
+          filterAndGenTSInsertRecordsOfOneDeviceReq(
+              deviceId, times, measurementsList, typesList, valuesList, haveSorted, true);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+          deviceId,
+          times.toString(),
+          measurementsList.toString());
+      return;
+    }
     try {
       getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
     } catch (RedirectException e) {
@@ -1375,9 +1824,19 @@ public class Session {
       throw new IllegalArgumentException(
           "times, measurementsList and valuesList's size should be equal");
     }
-    TSInsertStringRecordsOfOneDeviceReq req =
-        genTSInsertStringRecordsOfOneDeviceReq(
-            deviceId, times, measurementsList, valuesList, haveSorted, true);
+    TSInsertStringRecordsOfOneDeviceReq req;
+    try {
+      req =
+          filterAndGenTSInsertStringRecordsOfOneDeviceReq(
+              deviceId, times, measurementsList, valuesList, haveSorted, true);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]",
+          deviceId,
+          times.toString(),
+          measurementsList.toString());
+      return;
+    }
     try {
       getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
     } catch (RedirectException e) {
@@ -1404,6 +1863,27 @@ public class Session {
     insertAlignedStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList, false);
   }
 
+  private TSInsertRecordsOfOneDeviceReq filterAndGenTSInsertRecordsOfOneDeviceReq(
+      String prefixPath,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList,
+      boolean haveSorted,
+      boolean isAligned)
+      throws IoTDBConnectionException, BatchExecutionException {
+    if (hasNull(valuesList)) {
+      measurementsList = changeToArrayListWithStringType(measurementsList);
+      valuesList = changeToArrayList(valuesList);
+      typesList = changeToArrayListWithTSDataType(typesList);
+      times = new ArrayList<>(times);
+      filterNullValueAndMeasurementOfOneDevice(
+          prefixPath, times, measurementsList, typesList, valuesList);
+    }
+    return genTSInsertRecordsOfOneDeviceReq(
+        prefixPath, times, measurementsList, typesList, valuesList, haveSorted, isAligned);
+  }
+
   private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(
       String prefixPath,
       List<Long> times,
@@ -1419,7 +1899,6 @@ public class Session {
       throw new IllegalArgumentException(
           "times, measurementsList and valuesList's size should be equal");
     }
-
     if (!checkSorted(times)) {
       // sort
       Integer[] index = new Integer[times.size()];
@@ -1446,6 +1925,24 @@ public class Session {
     return request;
   }
 
+  private TSInsertStringRecordsOfOneDeviceReq filterAndGenTSInsertStringRecordsOfOneDeviceReq(
+      String prefixPath,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList,
+      boolean haveSorted,
+      boolean isAligned) {
+    if (hasNull(valuesList)) {
+      measurementsList = changeToArrayListWithStringType(measurementsList);
+      valuesList = changeToArrayListWithStringType(valuesList);
+      times = new ArrayList<>(times);
+      filterNullValueAndMeasurementWithStringTypeOfOneDevice(
+          times, prefixPath, measurementsList, valuesList);
+    }
+    return genTSInsertStringRecordsOfOneDeviceReq(
+        prefixPath, times, measurementsList, valuesList, haveSorted, isAligned);
+  }
+
   private TSInsertStringRecordsOfOneDeviceReq genTSInsertStringRecordsOfOneDeviceReq(
       String prefixPath,
       List<Long> times,
@@ -1521,17 +2018,46 @@ public class Session {
       TSInsertRecordsReq request =
           recordsGroup.computeIfAbsent(connection, k -> new TSInsertRecordsReq());
       request.setIsAligned(isAligned);
-      updateTSInsertRecordsReq(
-          request,
-          deviceIds.get(i),
-          times.get(i),
-          measurementsList.get(i),
-          typesList.get(i),
-          valuesList.get(i));
+      try {
+        filterAndUpdateTSInsertRecordsReq(
+            request,
+            deviceIds.get(i),
+            times.get(i),
+            measurementsList.get(i),
+            typesList.get(i),
+            valuesList.get(i));
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements are [{}]",
+            deviceIds.get(i),
+            times.get(i),
+            measurementsList.get(i).toString());
+        continue;
+      }
     }
     insertByGroup(recordsGroup, SessionConnection::insertRecords);
   }
 
+  private TSInsertRecordsReq filterAndGenTSInsertRecordsReq(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList,
+      boolean isAligned)
+      throws IoTDBConnectionException {
+    if (hasNull(valuesList)) {
+      measurementsList = changeToArrayListWithStringType(measurementsList);
+      valuesList = changeToArrayList(valuesList);
+      deviceIds = new ArrayList<>(deviceIds);
+      times = new ArrayList<>(times);
+      typesList = changeToArrayListWithTSDataType(typesList);
+      filterNullValueAndMeasurement(deviceIds, times, measurementsList, valuesList, typesList);
+    }
+    return genTSInsertRecordsReq(
+        deviceIds, times, measurementsList, typesList, valuesList, isAligned);
+  }
+
   private TSInsertRecordsReq genTSInsertRecordsReq(
       List<String> deviceIds,
       List<Long> times,
@@ -1550,6 +2076,27 @@ public class Session {
     return request;
   }
 
+  private void filterAndUpdateTSInsertRecordsReq(
+      TSInsertRecordsReq request,
+      String deviceId,
+      Long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      List<Object> values)
+      throws IoTDBConnectionException {
+    if (hasNull(values)) {
+      measurements = new ArrayList<>(measurements);
+      types = new ArrayList<>(types);
+      values = new ArrayList<>(values);
+      boolean isAllValuesNull =
+          filterNullValueAndMeasurement(deviceId, measurements, types, values);
+      if (isAllValuesNull) {
+        throw new NoValidValueException("All inserted data is null.");
+      }
+    }
+    updateTSInsertRecordsReq(request, deviceId, time, measurements, types, values);
+  }
+
   private void updateTSInsertRecordsReq(
       TSInsertRecordsReq request,
       String deviceId,
@@ -1819,8 +2366,20 @@ public class Session {
       List<List<String>> measurementsList,
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertStringRecordsReq request =
-        genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, false);
+    TSInsertStringRecordsReq request;
+    try {
+      request =
+          filterAndGenTSInsertStringRecordsReq(
+              deviceIds, times, measurementsList, valuesList, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+          deviceIds.toString(),
+          times.toString(),
+          measurementsList.toString());
+      return;
+    }
+
     defaultSessionConnection.testInsertRecords(request);
   }
 
@@ -1835,8 +2394,20 @@ public class Session {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertRecordsReq request =
-        genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, false);
+    TSInsertRecordsReq request;
+    try {
+      request =
+          filterAndGenTSInsertRecordsReq(
+              deviceIds, times, measurementsList, typesList, valuesList, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]",
+          deviceIds.toString(),
+          times.toString(),
+          measurementsList.toString());
+      return;
+    }
+
     defaultSessionConnection.testInsertRecords(request);
   }
 
@@ -1847,8 +2418,17 @@ public class Session {
   public void testInsertRecord(
       String deviceId, long time, List<String> measurements, List<String> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertStringRecordReq request =
-        genTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+    TSInsertStringRecordReq request;
+    try {
+      request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+          deviceId,
+          time,
+          measurements.toString());
+      return;
+    }
     defaultSessionConnection.testInsertRecord(request);
   }
 
@@ -1863,8 +2443,17 @@ public class Session {
       List<TSDataType> types,
       List<Object> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertRecordReq request =
-        genTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+    TSInsertRecordReq request;
+    try {
+      request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
+    } catch (NoValidValueException e) {
+      logger.warn(
+          "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
+          deviceId,
+          time,
+          measurements.toString());
+      return;
+    }
     defaultSessionConnection.testInsertRecord(request);
   }