You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/01/12 14:51:38 UTC

(iotdb) branch master updated: Pipe: fix NPE when parsing TabletInsertionData & add IT for null values insertion (#11877)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b09a5adcae Pipe: fix NPE when parsing TabletInsertionData & add IT for null values insertion (#11877)
7b09a5adcae is described below

commit 7b09a5adcaeb5ae4b96a9a9cbf3508829997b6af
Author: V_Galaxy <dy...@gmail.com>
AuthorDate: Fri Jan 12 22:51:31 2024 +0800

    Pipe: fix NPE when parsing TabletInsertionData & add IT for null values insertion (#11877)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java  |  46 ---
 .../apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java | 320 +++++++++++++++++++++
 .../tablet/TabletInsertionDataContainer.java       | 152 +++++++---
 .../pipe/event/PipeTabletInsertionEventTest.java   |  93 +++---
 4 files changed, 490 insertions(+), 121 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
index 65eb2a20f28..043f24cf3ef 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
@@ -33,7 +33,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -135,49 +134,4 @@ public class IoTDBPipeDataSinkIT extends AbstractPipeDualIT {
           Collections.singleton("0,1.0,"));
     }
   }
-
-  @Test
-  public void testInsertNull() throws Exception {
-    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
-
-    String receiverIp = receiverDataNode.getIp();
-    int receiverPort = receiverDataNode.getPort();
-
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
-      Map<String, String> extractorAttributes = new HashMap<>();
-      Map<String, String> processorAttributes = new HashMap<>();
-      Map<String, String> connectorAttributes = new HashMap<>();
-
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
-
-      TSStatus status =
-          client.createPipe(
-              new TCreatePipeReq("testPipe", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
-                  .setProcessorAttributes(processorAttributes));
-
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
-
-      if (!TestUtils.tryExecuteNonQueriesWithRetry(
-          receiverEnv,
-          Arrays.asList(
-              "create aligned timeseries root.sg.d1(s0 float, s1 float)",
-              "create aligned timeseries root.sg.d1(s0 float, s1 float)",
-              "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"))) {
-        return;
-      }
-
-      TestUtils.assertDataOnEnv(
-          receiverEnv,
-          "select * from root.**",
-          "Time,root.sg.d1.s0,root.sg.d1.s1,",
-          Collections.singleton("3,null,25.34,"));
-    }
-  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java
new file mode 100644
index 00000000000..46d649690ac
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class IoTDBPipeNullValueIT extends AbstractPipeDualIT {
+
+  // Test dimensions:
+  // 1. is or not aligned
+  // 2. is or not parsed
+  // 3. session insertRecord, session insertTablet, SQL insert
+  // 4. partial null, all null
+  // 5. one row or more (TODO)
+  // 6. more data types (TODO)
+
+  private enum InsertType {
+    SESSION_INSERT_RECORD,
+    SESSION_INSERT_TABLET,
+    SQL_INSERT,
+  }
+
+  private static final Map<InsertType, Consumer<Boolean>> INSERT_NULL_VALUE_MAP = new HashMap<>();
+
+  private static final List<String> CREATE_TIMESERIES_SQL =
+      Arrays.asList(
+          "create timeseries root.sg.d1.s0 with datatype=float",
+          "create timeseries root.sg.d1.s1 with datatype=float");
+
+  private static final List<String> CREATE_ALIGNED_TIMESERIES_SQL =
+      Collections.singletonList("create aligned timeseries root.sg.d1(s0 float, s1 float)");
+
+  private final String deviceId = "root.sg.d1";
+  private final List<String> measurements = Arrays.asList("s0", "s1");
+  private final List<TSDataType> types = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT);
+
+  private final List<Object> partialNullValues = Arrays.asList(null, 25.34F);
+  private final List<Object> allNullValues = Arrays.asList(null, null);
+
+  private Tablet partialNullTablet;
+  private Tablet allNullTablet;
+
+  private void constructTablet() {
+    final MeasurementSchema[] schemas = new MeasurementSchema[2];
+    for (int i = 0; i < schemas.length; i++) {
+      schemas[i] = new MeasurementSchema(measurements.get(i), types.get(i));
+    }
+
+    final BitMap[] bitMapsForPartialNull = new BitMap[2];
+    bitMapsForPartialNull[0] = new BitMap(1);
+    bitMapsForPartialNull[0].markAll();
+    bitMapsForPartialNull[1] = new BitMap(1);
+
+    final BitMap[] bitMapsForAllNull = new BitMap[2];
+    bitMapsForAllNull[0] = new BitMap(1);
+    bitMapsForAllNull[0].markAll();
+    bitMapsForAllNull[1] = new BitMap(1);
+    bitMapsForAllNull[1].markAll();
+
+    final Object[] valuesForPartialNull = new Object[2];
+    valuesForPartialNull[0] = new float[] {0F};
+    valuesForPartialNull[1] = new float[] {25.34F};
+
+    final Object[] valuesForAllNull = new Object[2];
+    valuesForAllNull[0] = new float[] {0F};
+    valuesForAllNull[1] = new float[] {0F};
+
+    partialNullTablet = new Tablet(deviceId, Arrays.asList(schemas), 1);
+    partialNullTablet.values = valuesForPartialNull;
+    partialNullTablet.timestamps = new long[] {3};
+    partialNullTablet.rowSize = 1;
+    partialNullTablet.bitMaps = bitMapsForPartialNull;
+
+    allNullTablet = new Tablet(deviceId, Arrays.asList(schemas), 1);
+    allNullTablet.values = valuesForAllNull;
+    allNullTablet.timestamps = new long[] {4};
+    allNullTablet.rowSize = 1;
+    allNullTablet.bitMaps = bitMapsForAllNull;
+  }
+
+  @Override
+  @Before
+  public void setUp() throws PipeEnvironmentException {
+    super.setUp();
+
+    constructTablet();
+
+    // init INSERT_NULL_VALUE_MAP
+    INSERT_NULL_VALUE_MAP.put(
+        InsertType.SESSION_INSERT_RECORD,
+        (isAligned) -> {
+          try {
+            try (ISession session = senderEnv.getSessionConnection()) {
+              if (isAligned) {
+                session.insertAlignedRecord(deviceId, 3, measurements, types, partialNullValues);
+                session.insertAlignedRecord(deviceId, 4, measurements, types, allNullValues);
+              } else {
+                session.insertRecord(deviceId, 3, measurements, types, partialNullValues);
+                session.insertRecord(deviceId, 4, measurements, types, allNullValues);
+              }
+            } catch (StatementExecutionException e) {
+              fail(e.getMessage());
+            }
+          } catch (IoTDBConnectionException e) {
+            fail(e.getMessage());
+          }
+        });
+
+    INSERT_NULL_VALUE_MAP.put(
+        InsertType.SESSION_INSERT_TABLET,
+        (isAligned) -> {
+          try {
+            try (ISession session = senderEnv.getSessionConnection()) {
+              if (isAligned) {
+                session.insertAlignedTablet(partialNullTablet);
+                session.insertAlignedTablet(allNullTablet);
+              } else {
+                session.insertTablet(partialNullTablet);
+                session.insertTablet(allNullTablet);
+              }
+            } catch (StatementExecutionException e) {
+              fail(e.getMessage());
+            }
+          } catch (IoTDBConnectionException e) {
+            fail(e.getMessage());
+          }
+        });
+
+    INSERT_NULL_VALUE_MAP.put(
+        InsertType.SQL_INSERT,
+        (isAligned) -> {
+          // partial null
+          if (!TestUtils.tryExecuteNonQueriesWithRetry(
+              senderEnv,
+              isAligned
+                  ? Collections.singletonList(
+                      "insert into root.sg.d1(time, s0, s1) aligned values (3, null, 25.34)")
+                  : Collections.singletonList(
+                      "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"))) {
+            fail();
+          }
+          // all null
+          if (!TestUtils.tryExecuteNonQueriesWithRetry(
+              senderEnv,
+              isAligned
+                  ? Collections.singletonList(
+                      "insert into root.sg.d1(time, s0, s1) aligned values (4, null, null)")
+                  : Collections.singletonList(
+                      "insert into root.sg.d1(time, s0, s1) values (4, null, null)"))) {
+            fail();
+          }
+        });
+  }
+
+  private void testInsertNullValueTemplate(
+      InsertType insertType, boolean isAligned, boolean withParsing) throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      if (withParsing) {
+        extractorAttributes.put("extractor.pattern", "root.sg.d1");
+      }
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("test", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+    }
+
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        receiverEnv, isAligned ? CREATE_ALIGNED_TIMESERIES_SQL : CREATE_TIMESERIES_SQL)) {
+      fail();
+    }
+
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        senderEnv, isAligned ? CREATE_ALIGNED_TIMESERIES_SQL : CREATE_TIMESERIES_SQL)) {
+      fail();
+    }
+
+    INSERT_NULL_VALUE_MAP.get(insertType).accept(isAligned);
+
+    TestUtils.assertDataOnEnv(
+        receiverEnv,
+        "select count(*) from root.**",
+        "count(root.sg.d1.s0),count(root.sg.d1.s1),",
+        Collections.singleton("0,1,"));
+  }
+
+  // ---------------------- //
+  // Scenario 1: SQL Insert //
+  // ---------------------- //
+  @Test
+  public void testSQLInsertWithParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SQL_INSERT, false, true);
+  }
+
+  @Test
+  public void testSQLInsertWithoutParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SQL_INSERT, false, false);
+  }
+
+  @Test
+  public void testSQLInsertAlignedWithParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SQL_INSERT, true, true);
+  }
+
+  @Test
+  public void testSQLInsertAlignedWithoutParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SQL_INSERT, true, false);
+  }
+
+  // --------------------------------- //
+  // Scenario 2: Session Insert Record //
+  // --------------------------------- //
+  @Test
+  public void testSessionInsertRecordWithParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, false, true);
+  }
+
+  @Test
+  public void testSessionInsertRecordWithoutParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, false, false);
+  }
+
+  @Test
+  public void testSessionInsertRecordAlignedWithParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, true, true);
+  }
+
+  @Test
+  public void testSessionInsertRecordAlignedWithoutParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, true, false);
+  }
+
+  // --------------------------------- //
+  // Scenario 3: Session Insert Tablet //
+  // --------------------------------- //
+  @Test
+  public void testSessionInsertTabletWithParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, true);
+  }
+
+  @Test
+  public void testSessionInsertTabletWithoutParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, false);
+  }
+
+  @Test
+  public void testSessionInsertTabletAlignedWithParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, true, true);
+  }
+
+  @Test
+  public void testSessionInsertTabletAlignedWithoutParsing() throws Exception {
+    testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, true, false);
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 222e135335a..6fb80a88788 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -159,22 +160,33 @@ public class TabletInsertionDataContainer {
         this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i];
         this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
-        this.valueColumns[filteredColumnIndex] =
-            filterValueColumnsByRowIndexList(
-                originValueColumnTypes[i], originValueColumns[i], rowIndexList, true);
-        this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
+        BitMap bitMap = new BitMap(this.timestampColumn.length);
+        if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) {
+          this.valueColumns[filteredColumnIndex] = null;
+          bitMap.markAll();
+        } else {
+          this.valueColumns[filteredColumnIndex] =
+              filterValueColumnsByRowIndexList(
+                  originValueColumnTypes[i],
+                  originValueColumns[i],
+                  rowIndexList,
+                  true,
+                  bitMap, // use the output bitmap since there is no bitmap in InsertRowNode
+                  bitMap);
+        }
+        this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
       }
     }
 
-    rowCount = rowIndexList.size();
-    if (rowCount == 0 && LOGGER.isDebugEnabled()) {
+    this.rowCount = this.timestampColumn.length;
+    if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "InsertRowNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
           insertRowNode,
           pattern,
-          sourceEvent.getStartTime(),
-          sourceEvent.getEndTime(),
-          sourceEvent);
+          this.sourceEvent.getStartTime(),
+          this.sourceEvent.getEndTime(),
+          this.sourceEvent);
     }
   }
 
@@ -186,6 +198,7 @@ public class TabletInsertionDataContainer {
     this.isAligned = insertTabletNode.isAligned();
 
     final long[] originTimestampColumn = insertTabletNode.getTimes();
+    final int originRowSize = originTimestampColumn.length;
     List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn);
     this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray();
 
@@ -215,12 +228,12 @@ public class TabletInsertionDataContainer {
         (insertTabletNode.getBitMaps() == null
             ? IntStream.range(0, originColumnSize)
                 .boxed()
-                .map(o -> new BitMap(timestampColumn.length))
+                .map(o -> new BitMap(originRowSize))
                 .toArray(BitMap[]::new)
             : insertTabletNode.getBitMaps());
     for (int i = 0; i < originBitMapList.length; i++) {
       if (originBitMapList[i] == null) {
-        originBitMapList[i] = new BitMap(timestampColumn.length);
+        originBitMapList[i] = new BitMap(originRowSize);
       }
     }
 
@@ -230,14 +243,25 @@ public class TabletInsertionDataContainer {
         this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i];
         this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
-        this.valueColumns[filteredColumnIndex] =
-            filterValueColumnsByRowIndexList(
-                originValueColumnTypes[i], originValueColumns[i], rowIndexList, false);
-        this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+        BitMap bitMap = new BitMap(this.timestampColumn.length);
+        if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) {
+          this.valueColumns[filteredColumnIndex] = null;
+          bitMap.markAll();
+        } else {
+          this.valueColumns[filteredColumnIndex] =
+              filterValueColumnsByRowIndexList(
+                  originValueColumnTypes[i],
+                  originValueColumns[i],
+                  rowIndexList,
+                  false,
+                  originBitMapList[i],
+                  bitMap);
+        }
+        this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
       }
     }
 
-    rowCount = timestampColumn.length;
+    this.rowCount = this.timestampColumn.length;
     if (rowCount == 0 && LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "InsertTabletNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
@@ -256,7 +280,9 @@ public class TabletInsertionDataContainer {
     this.deviceId = tablet.deviceId;
     this.isAligned = isAligned;
 
-    final long[] originTimestampColumn = tablet.timestamps;
+    final long[] originTimestampColumn =
+        Arrays.copyOf(
+            tablet.timestamps, tablet.rowSize); // tablet.timestamps.length == tablet.maxRowNumber
     List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn);
     this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray();
 
@@ -287,17 +313,18 @@ public class TabletInsertionDataContainer {
       originColumnNameStringList[i] = originMeasurementSchemaList.get(i).getMeasurementId();
       originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType();
     }
-    final Object[] originValueColumns = tablet.values;
+    final Object[] originValueColumns =
+        tablet.values; // we do not reduce value columns here by origin row size
     final BitMap[] originBitMapList =
         tablet.bitMaps == null
             ? IntStream.range(0, originColumnSize)
                 .boxed()
-                .map(o -> new BitMap(timestampColumn.length))
+                .map(o -> new BitMap(tablet.getMaxRowNumber()))
                 .toArray(BitMap[]::new)
-            : tablet.bitMaps;
+            : tablet.bitMaps; // we do not reduce bitmaps here by origin row size
     for (int i = 0; i < originBitMapList.length; i++) {
       if (originBitMapList[i] == null) {
-        originBitMapList[i] = new BitMap(timestampColumn.length);
+        originBitMapList[i] = new BitMap(tablet.getMaxRowNumber());
       }
     }
 
@@ -307,22 +334,33 @@ public class TabletInsertionDataContainer {
         this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList.get(i);
         this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
-        this.valueColumns[filteredColumnIndex] =
-            filterValueColumnsByRowIndexList(
-                originValueColumnTypes[i], originValueColumns[i], rowIndexList, false);
-        this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+        BitMap bitMap = new BitMap(this.timestampColumn.length);
+        if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) {
+          this.valueColumns[filteredColumnIndex] = null;
+          bitMap.markAll();
+        } else {
+          this.valueColumns[filteredColumnIndex] =
+              filterValueColumnsByRowIndexList(
+                  originValueColumnTypes[i],
+                  originValueColumns[i],
+                  rowIndexList,
+                  false,
+                  originBitMapList[i],
+                  bitMap);
+        }
+        this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
       }
     }
 
-    rowCount = tablet.rowSize;
-    if (rowCount == 0 && LOGGER.isDebugEnabled()) {
+    this.rowCount = this.timestampColumn.length;
+    if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "Tablet({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
           tablet,
           pattern,
-          sourceEvent.getStartTime(),
-          sourceEvent.getEndTime(),
-          sourceEvent);
+          this.sourceEvent.getStartTime(),
+          this.sourceEvent.getEndTime(),
+          this.sourceEvent);
     }
   }
 
@@ -394,10 +432,12 @@ public class TabletInsertionDataContainer {
   }
 
   private static Object filterValueColumnsByRowIndexList(
-      TSDataType type,
-      Object originValueColumn,
-      List<Integer> rowIndexList,
-      boolean isSingleOriginValueColumn) {
+      @NonNull TSDataType type,
+      @NonNull Object originValueColumn,
+      @NonNull List<Integer> rowIndexList,
+      boolean isSingleOriginValueColumn,
+      @NonNull BitMap originNullValueColumnBitmap,
+      @NonNull BitMap nullValueColumnBitmap /* output parameters */) {
     switch (type) {
       case INT32:
         {
@@ -407,7 +447,12 @@ public class TabletInsertionDataContainer {
                   : (int[]) originValueColumn;
           int[] valueColumns = new int[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            valueColumns[i] = intValueColumns[rowIndexList.get(i)];
+            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              valueColumns[i] = 0;
+              nullValueColumnBitmap.mark(i);
+            } else {
+              valueColumns[i] = intValueColumns[rowIndexList.get(i)];
+            }
           }
           return valueColumns;
         }
@@ -419,7 +464,12 @@ public class TabletInsertionDataContainer {
                   : (long[]) originValueColumn;
           long[] valueColumns = new long[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            valueColumns[i] = longValueColumns[rowIndexList.get(i)];
+            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              valueColumns[i] = 0L;
+              nullValueColumnBitmap.mark(i);
+            } else {
+              valueColumns[i] = longValueColumns[rowIndexList.get(i)];
+            }
           }
           return valueColumns;
         }
@@ -431,7 +481,12 @@ public class TabletInsertionDataContainer {
                   : (float[]) originValueColumn;
           float[] valueColumns = new float[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            valueColumns[i] = floatValueColumns[rowIndexList.get(i)];
+            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              valueColumns[i] = 0F;
+              nullValueColumnBitmap.mark(i);
+            } else {
+              valueColumns[i] = floatValueColumns[rowIndexList.get(i)];
+            }
           }
           return valueColumns;
         }
@@ -443,7 +498,12 @@ public class TabletInsertionDataContainer {
                   : (double[]) originValueColumn;
           double[] valueColumns = new double[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            valueColumns[i] = doubleValueColumns[rowIndexList.get(i)];
+            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              valueColumns[i] = 0D;
+              nullValueColumnBitmap.mark(i);
+            } else {
+              valueColumns[i] = doubleValueColumns[rowIndexList.get(i)];
+            }
           }
           return valueColumns;
         }
@@ -455,7 +515,12 @@ public class TabletInsertionDataContainer {
                   : (boolean[]) originValueColumn;
           boolean[] valueColumns = new boolean[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            valueColumns[i] = booleanValueColumns[rowIndexList.get(i)];
+            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              valueColumns[i] = false;
+              nullValueColumnBitmap.mark(i);
+            } else {
+              valueColumns[i] = booleanValueColumns[rowIndexList.get(i)];
+            }
           }
           return valueColumns;
         }
@@ -467,7 +532,14 @@ public class TabletInsertionDataContainer {
                   : (Binary[]) originValueColumn;
           Binary[] valueColumns = new Binary[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            valueColumns[i] = new Binary(binaryValueColumns[rowIndexList.get(i)].getValues());
+            if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)])
+                || Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues())
+                || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              valueColumns[i] = Binary.EMPTY_VALUE;
+              nullValueColumnBitmap.mark(i);
+            } else {
+              valueColumns[i] = new Binary(binaryValueColumns[rowIndexList.get(i)].getValues());
+            }
           }
           return valueColumns;
         }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 46543ceabf0..011d26f2ff0 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -75,18 +75,18 @@ public class PipeTabletInsertionEventTest {
   }
 
   private void createMeasurementSchema() {
-    for (int i = 0; i < 6; i++) {
+    for (int i = 0; i < schemas.length; i++) {
       schemas[i] = new MeasurementSchema(measurementIds[i], dataTypes[i]);
     }
   }
 
   private void createInsertRowNode() throws IllegalPathException {
-    final Object[] values = new Object[6];
+    final Object[] values = new Object[schemas.length];
 
     values[0] = 100;
     values[1] = 10000L;
     values[2] = 2F;
-    values[3] = 1.0;
+    values[3] = 1D;
     values[4] = false;
     values[5] = BytesUtils.valueOf("text");
 
@@ -116,20 +116,20 @@ public class PipeTabletInsertionEventTest {
   }
 
   private void createInsertTabletNode() throws IllegalPathException {
-    final Object[] values = new Object[6];
+    final Object[] values = new Object[schemas.length];
 
-    values[0] = new int[5];
-    values[1] = new long[5];
-    values[2] = new float[5];
-    values[3] = new double[5];
-    values[4] = new boolean[5];
-    values[5] = new Binary[5];
+    values[0] = new int[times.length];
+    values[1] = new long[times.length];
+    values[2] = new float[times.length];
+    values[3] = new double[times.length];
+    values[4] = new boolean[times.length];
+    values[5] = new Binary[times.length];
 
-    for (int r = 0; r < 5; r++) {
+    for (int r = 0; r < times.length; r++) {
       ((int[]) values[0])[r] = 100;
-      ((long[]) values[1])[r] = 10000;
-      ((float[]) values[2])[r] = 2;
-      ((double[]) values[3])[r] = 1.0;
+      ((long[]) values[1])[r] = 10000L;
+      ((float[]) values[2])[r] = 2F;
+      ((double[]) values[3])[r] = 1D;
       ((boolean[]) values[4])[r] = false;
       ((Binary[]) values[5])[r] = BytesUtils.valueOf("text");
     }
@@ -162,11 +162,11 @@ public class PipeTabletInsertionEventTest {
   }
 
   private void createTablet() {
-    final Object[] values = new Object[6];
+    final Object[] values = new Object[schemas.length];
 
     // create tablet for insertRowNode
-    BitMap[] bitMapsForInsertRowNode = new BitMap[6];
-    for (int i = 0; i < 6; i++) {
+    BitMap[] bitMapsForInsertRowNode = new BitMap[schemas.length];
+    for (int i = 0; i < schemas.length; i++) {
       bitMapsForInsertRowNode[i] = new BitMap(1);
     }
 
@@ -179,9 +179,9 @@ public class PipeTabletInsertionEventTest {
 
     for (int r = 0; r < 1; r++) {
       ((int[]) values[0])[r] = 100;
-      ((long[]) values[1])[r] = 10000;
-      ((float[]) values[2])[r] = 2;
-      ((double[]) values[3])[r] = 1.0;
+      ((long[]) values[1])[r] = 10000L;
+      ((float[]) values[2])[r] = 2F;
+      ((double[]) values[3])[r] = 1D;
       ((boolean[]) values[4])[r] = false;
       ((Binary[]) values[5])[r] = BytesUtils.valueOf("text");
     }
@@ -193,26 +193,27 @@ public class PipeTabletInsertionEventTest {
     tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode;
 
     // create tablet for insertTabletNode
-    BitMap[] bitMapsForInsertTabletNode = new BitMap[6];
-    for (int i = 0; i < 6; i++) {
+    BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length];
+    for (int i = 0; i < schemas.length; i++) {
       bitMapsForInsertTabletNode[i] = new BitMap(times.length);
     }
 
-    values[0] = new int[5];
-    values[1] = new long[5];
-    values[2] = new float[5];
-    values[3] = new double[5];
-    values[4] = new boolean[5];
-    values[5] = new Binary[5];
+    values[0] = new int[times.length];
+    values[1] = new long[times.length];
+    values[2] = new float[times.length];
+    values[3] = new double[times.length];
+    values[4] = new boolean[times.length];
+    values[5] = new Binary[times.length];
 
-    for (int r = 0; r < 5; r++) {
+    for (int r = 0; r < times.length; r++) {
       ((int[]) values[0])[r] = 100;
-      ((long[]) values[1])[r] = 10000;
-      ((float[]) values[2])[r] = 2;
-      ((double[]) values[3])[r] = 1.0;
+      ((long[]) values[1])[r] = 10000L;
+      ((float[]) values[2])[r] = 2F;
+      ((double[]) values[3])[r] = 1D;
       ((boolean[]) values[4])[r] = false;
       ((Binary[]) values[5])[r] = BytesUtils.valueOf("text");
     }
+
     tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas), times.length);
     tabletForInsertTabletNode.values = values;
     tabletForInsertTabletNode.timestamps = times;
@@ -294,13 +295,35 @@ public class PipeTabletInsertionEventTest {
     TabletInsertionDataContainer container2 =
         new TabletInsertionDataContainer(
             null,
-            new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 113L),
-            insertTabletNode,
+            new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L, 110L),
+            insertRowNode,
             pattern);
     Tablet tablet2 = container2.convertToTablet();
-    Assert.assertEquals(3, tablet2.rowSize);
+    Assert.assertEquals(1, tablet2.rowSize);
     boolean isAligned2 = container2.isAligned();
     Assert.assertFalse(isAligned2);
+
+    TabletInsertionDataContainer container3 =
+        new TabletInsertionDataContainer(
+            null,
+            new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 113L),
+            insertTabletNode,
+            pattern);
+    Tablet tablet3 = container3.convertToTablet();
+    Assert.assertEquals(3, tablet3.rowSize);
+    boolean isAligned3 = container3.isAligned();
+    Assert.assertFalse(isAligned3);
+
+    TabletInsertionDataContainer container4 =
+        new TabletInsertionDataContainer(
+            null,
+            new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, Long.MIN_VALUE, 109L),
+            insertTabletNode,
+            pattern);
+    Tablet tablet4 = container4.convertToTablet();
+    Assert.assertEquals(0, tablet4.rowSize);
+    boolean isAligned4 = container4.isAligned();
+    Assert.assertFalse(isAligned4);
   }
 
   @Test