You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/01/12 14:51:38 UTC
(iotdb) branch master updated: Pipe: fix NPE when parsing TabletInsertionData & add IT for null values insertion (#11877)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7b09a5adcae Pipe: fix NPE when parsing TabletInsertionData & add IT for null values insertion (#11877)
7b09a5adcae is described below
commit 7b09a5adcaeb5ae4b96a9a9cbf3508829997b6af
Author: V_Galaxy <dy...@gmail.com>
AuthorDate: Fri Jan 12 22:51:31 2024 +0800
Pipe: fix NPE when parsing TabletInsertionData & add IT for null values insertion (#11877)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java | 46 ---
.../apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java | 320 +++++++++++++++++++++
.../tablet/TabletInsertionDataContainer.java | 152 +++++++---
.../pipe/event/PipeTabletInsertionEventTest.java | 93 +++---
4 files changed, 490 insertions(+), 121 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
index 65eb2a20f28..043f24cf3ef 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
@@ -33,7 +33,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -135,49 +134,4 @@ public class IoTDBPipeDataSinkIT extends AbstractPipeDualIT {
Collections.singleton("0,1.0,"));
}
}
-
- @Test
- public void testInsertNull() throws Exception {
- DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
-
- String receiverIp = receiverDataNode.getIp();
- int receiverPort = receiverDataNode.getPort();
-
- try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- Map<String, String> extractorAttributes = new HashMap<>();
- Map<String, String> processorAttributes = new HashMap<>();
- Map<String, String> connectorAttributes = new HashMap<>();
-
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port", Integer.toString(receiverPort));
-
- TSStatus status =
- client.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
- .setProcessorAttributes(processorAttributes));
-
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
-
- if (!TestUtils.tryExecuteNonQueriesWithRetry(
- receiverEnv,
- Arrays.asList(
- "create aligned timeseries root.sg.d1(s0 float, s1 float)",
- "create aligned timeseries root.sg.d1(s0 float, s1 float)",
- "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"))) {
- return;
- }
-
- TestUtils.assertDataOnEnv(
- receiverEnv,
- "select * from root.**",
- "Time,root.sg.d1.s0,root.sg.d1.s1,",
- Collections.singleton("3,null,25.34,"));
- }
- }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java
new file mode 100644
index 00000000000..46d649690ac
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class IoTDBPipeNullValueIT extends AbstractPipeDualIT {
+
+ // Test dimensions:
+ // 1. is or not aligned
+ // 2. is or not parsed
+ // 3. session insertRecord, session insertTablet, SQL insert
+ // 4. partial null, all null
+ // 5. one row or more (TODO)
+ // 6. more data types (TODO)
+
+ private enum InsertType {
+ SESSION_INSERT_RECORD,
+ SESSION_INSERT_TABLET,
+ SQL_INSERT,
+ }
+
+ private static final Map<InsertType, Consumer<Boolean>> INSERT_NULL_VALUE_MAP = new HashMap<>();
+
+ private static final List<String> CREATE_TIMESERIES_SQL =
+ Arrays.asList(
+ "create timeseries root.sg.d1.s0 with datatype=float",
+ "create timeseries root.sg.d1.s1 with datatype=float");
+
+ private static final List<String> CREATE_ALIGNED_TIMESERIES_SQL =
+ Collections.singletonList("create aligned timeseries root.sg.d1(s0 float, s1 float)");
+
+ private final String deviceId = "root.sg.d1";
+ private final List<String> measurements = Arrays.asList("s0", "s1");
+ private final List<TSDataType> types = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT);
+
+ private final List<Object> partialNullValues = Arrays.asList(null, 25.34F);
+ private final List<Object> allNullValues = Arrays.asList(null, null);
+
+ private Tablet partialNullTablet;
+ private Tablet allNullTablet;
+
+ private void constructTablet() {
+ final MeasurementSchema[] schemas = new MeasurementSchema[2];
+ for (int i = 0; i < schemas.length; i++) {
+ schemas[i] = new MeasurementSchema(measurements.get(i), types.get(i));
+ }
+
+ final BitMap[] bitMapsForPartialNull = new BitMap[2];
+ bitMapsForPartialNull[0] = new BitMap(1);
+ bitMapsForPartialNull[0].markAll();
+ bitMapsForPartialNull[1] = new BitMap(1);
+
+ final BitMap[] bitMapsForAllNull = new BitMap[2];
+ bitMapsForAllNull[0] = new BitMap(1);
+ bitMapsForAllNull[0].markAll();
+ bitMapsForAllNull[1] = new BitMap(1);
+ bitMapsForAllNull[1].markAll();
+
+ final Object[] valuesForPartialNull = new Object[2];
+ valuesForPartialNull[0] = new float[] {0F};
+ valuesForPartialNull[1] = new float[] {25.34F};
+
+ final Object[] valuesForAllNull = new Object[2];
+ valuesForAllNull[0] = new float[] {0F};
+ valuesForAllNull[1] = new float[] {0F};
+
+ partialNullTablet = new Tablet(deviceId, Arrays.asList(schemas), 1);
+ partialNullTablet.values = valuesForPartialNull;
+ partialNullTablet.timestamps = new long[] {3};
+ partialNullTablet.rowSize = 1;
+ partialNullTablet.bitMaps = bitMapsForPartialNull;
+
+ allNullTablet = new Tablet(deviceId, Arrays.asList(schemas), 1);
+ allNullTablet.values = valuesForAllNull;
+ allNullTablet.timestamps = new long[] {4};
+ allNullTablet.rowSize = 1;
+ allNullTablet.bitMaps = bitMapsForAllNull;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws PipeEnvironmentException {
+ super.setUp();
+
+ constructTablet();
+
+ // init INSERT_NULL_VALUE_MAP
+ INSERT_NULL_VALUE_MAP.put(
+ InsertType.SESSION_INSERT_RECORD,
+ (isAligned) -> {
+ try {
+ try (ISession session = senderEnv.getSessionConnection()) {
+ if (isAligned) {
+ session.insertAlignedRecord(deviceId, 3, measurements, types, partialNullValues);
+ session.insertAlignedRecord(deviceId, 4, measurements, types, allNullValues);
+ } else {
+ session.insertRecord(deviceId, 3, measurements, types, partialNullValues);
+ session.insertRecord(deviceId, 4, measurements, types, allNullValues);
+ }
+ } catch (StatementExecutionException e) {
+ fail(e.getMessage());
+ }
+ } catch (IoTDBConnectionException e) {
+ fail(e.getMessage());
+ }
+ });
+
+ INSERT_NULL_VALUE_MAP.put(
+ InsertType.SESSION_INSERT_TABLET,
+ (isAligned) -> {
+ try {
+ try (ISession session = senderEnv.getSessionConnection()) {
+ if (isAligned) {
+ session.insertAlignedTablet(partialNullTablet);
+ session.insertAlignedTablet(allNullTablet);
+ } else {
+ session.insertTablet(partialNullTablet);
+ session.insertTablet(allNullTablet);
+ }
+ } catch (StatementExecutionException e) {
+ fail(e.getMessage());
+ }
+ } catch (IoTDBConnectionException e) {
+ fail(e.getMessage());
+ }
+ });
+
+ INSERT_NULL_VALUE_MAP.put(
+ InsertType.SQL_INSERT,
+ (isAligned) -> {
+ // partial null
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ isAligned
+ ? Collections.singletonList(
+ "insert into root.sg.d1(time, s0, s1) aligned values (3, null, 25.34)")
+ : Collections.singletonList(
+ "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"))) {
+ fail();
+ }
+ // all null
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ isAligned
+ ? Collections.singletonList(
+ "insert into root.sg.d1(time, s0, s1) aligned values (4, null, null)")
+ : Collections.singletonList(
+ "insert into root.sg.d1(time, s0, s1) values (4, null, null)"))) {
+ fail();
+ }
+ });
+ }
+
+ private void testInsertNullValueTemplate(
+ InsertType insertType, boolean isAligned, boolean withParsing) throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ if (withParsing) {
+ extractorAttributes.put("extractor.pattern", "root.sg.d1");
+ }
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("test", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ receiverEnv, isAligned ? CREATE_ALIGNED_TIMESERIES_SQL : CREATE_TIMESERIES_SQL)) {
+ fail();
+ }
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, isAligned ? CREATE_ALIGNED_TIMESERIES_SQL : CREATE_TIMESERIES_SQL)) {
+ fail();
+ }
+
+ INSERT_NULL_VALUE_MAP.get(insertType).accept(isAligned);
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.sg.d1.s0),count(root.sg.d1.s1),",
+ Collections.singleton("0,1,"));
+ }
+
+ // ---------------------- //
+ // Scenario 1: SQL Insert //
+ // ---------------------- //
+ @Test
+ public void testSQLInsertWithParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SQL_INSERT, false, true);
+ }
+
+ @Test
+ public void testSQLInsertWithoutParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SQL_INSERT, false, false);
+ }
+
+ @Test
+ public void testSQLInsertAlignedWithParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SQL_INSERT, true, true);
+ }
+
+ @Test
+ public void testSQLInsertAlignedWithoutParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SQL_INSERT, true, false);
+ }
+
+ // --------------------------------- //
+ // Scenario 2: Session Insert Record //
+ // --------------------------------- //
+ @Test
+ public void testSessionInsertRecordWithParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, false, true);
+ }
+
+ @Test
+ public void testSessionInsertRecordWithoutParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, false, false);
+ }
+
+ @Test
+ public void testSessionInsertRecordAlignedWithParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, true, true);
+ }
+
+ @Test
+ public void testSessionInsertRecordAlignedWithoutParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, true, false);
+ }
+
+ // --------------------------------- //
+ // Scenario 3: Session Insert Tablet //
+ // --------------------------------- //
+ @Test
+ public void testSessionInsertTabletWithParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, true);
+ }
+
+ @Test
+ public void testSessionInsertTabletWithoutParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, false);
+ }
+
+ @Test
+ public void testSessionInsertTabletAlignedWithParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, true, true);
+ }
+
+ @Test
+ public void testSessionInsertTabletAlignedWithoutParsing() throws Exception {
+ testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, true, false);
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 222e135335a..6fb80a88788 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,22 +160,33 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
- this.valueColumns[filteredColumnIndex] =
- filterValueColumnsByRowIndexList(
- originValueColumnTypes[i], originValueColumns[i], rowIndexList, true);
- this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
+ BitMap bitMap = new BitMap(this.timestampColumn.length);
+ if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) {
+ this.valueColumns[filteredColumnIndex] = null;
+ bitMap.markAll();
+ } else {
+ this.valueColumns[filteredColumnIndex] =
+ filterValueColumnsByRowIndexList(
+ originValueColumnTypes[i],
+ originValueColumns[i],
+ rowIndexList,
+ true,
+ bitMap, // use the output bitmap since there is no bitmap in InsertRowNode
+ bitMap);
+ }
+ this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
}
}
- rowCount = rowIndexList.size();
- if (rowCount == 0 && LOGGER.isDebugEnabled()) {
+ this.rowCount = this.timestampColumn.length;
+ if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"InsertRowNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
insertRowNode,
pattern,
- sourceEvent.getStartTime(),
- sourceEvent.getEndTime(),
- sourceEvent);
+ this.sourceEvent.getStartTime(),
+ this.sourceEvent.getEndTime(),
+ this.sourceEvent);
}
}
@@ -186,6 +198,7 @@ public class TabletInsertionDataContainer {
this.isAligned = insertTabletNode.isAligned();
final long[] originTimestampColumn = insertTabletNode.getTimes();
+ final int originRowSize = originTimestampColumn.length;
List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn);
this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray();
@@ -215,12 +228,12 @@ public class TabletInsertionDataContainer {
(insertTabletNode.getBitMaps() == null
? IntStream.range(0, originColumnSize)
.boxed()
- .map(o -> new BitMap(timestampColumn.length))
+ .map(o -> new BitMap(originRowSize))
.toArray(BitMap[]::new)
: insertTabletNode.getBitMaps());
for (int i = 0; i < originBitMapList.length; i++) {
if (originBitMapList[i] == null) {
- originBitMapList[i] = new BitMap(timestampColumn.length);
+ originBitMapList[i] = new BitMap(originRowSize);
}
}
@@ -230,14 +243,25 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
- this.valueColumns[filteredColumnIndex] =
- filterValueColumnsByRowIndexList(
- originValueColumnTypes[i], originValueColumns[i], rowIndexList, false);
- this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+ BitMap bitMap = new BitMap(this.timestampColumn.length);
+ if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) {
+ this.valueColumns[filteredColumnIndex] = null;
+ bitMap.markAll();
+ } else {
+ this.valueColumns[filteredColumnIndex] =
+ filterValueColumnsByRowIndexList(
+ originValueColumnTypes[i],
+ originValueColumns[i],
+ rowIndexList,
+ false,
+ originBitMapList[i],
+ bitMap);
+ }
+ this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
}
}
- rowCount = timestampColumn.length;
+ this.rowCount = this.timestampColumn.length;
if (rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"InsertTabletNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
@@ -256,7 +280,9 @@ public class TabletInsertionDataContainer {
this.deviceId = tablet.deviceId;
this.isAligned = isAligned;
- final long[] originTimestampColumn = tablet.timestamps;
+ final long[] originTimestampColumn =
+ Arrays.copyOf(
+ tablet.timestamps, tablet.rowSize); // tablet.timestamps.length == tablet.maxRowNumber
List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn);
this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray();
@@ -287,17 +313,18 @@ public class TabletInsertionDataContainer {
originColumnNameStringList[i] = originMeasurementSchemaList.get(i).getMeasurementId();
originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType();
}
- final Object[] originValueColumns = tablet.values;
+ final Object[] originValueColumns =
+ tablet.values; // we do not reduce value columns here by origin row size
final BitMap[] originBitMapList =
tablet.bitMaps == null
? IntStream.range(0, originColumnSize)
.boxed()
- .map(o -> new BitMap(timestampColumn.length))
+ .map(o -> new BitMap(tablet.getMaxRowNumber()))
.toArray(BitMap[]::new)
- : tablet.bitMaps;
+ : tablet.bitMaps; // we do not reduce bitmaps here by origin row size
for (int i = 0; i < originBitMapList.length; i++) {
if (originBitMapList[i] == null) {
- originBitMapList[i] = new BitMap(timestampColumn.length);
+ originBitMapList[i] = new BitMap(tablet.getMaxRowNumber());
}
}
@@ -307,22 +334,33 @@ public class TabletInsertionDataContainer {
this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList.get(i);
this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
- this.valueColumns[filteredColumnIndex] =
- filterValueColumnsByRowIndexList(
- originValueColumnTypes[i], originValueColumns[i], rowIndexList, false);
- this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+ BitMap bitMap = new BitMap(this.timestampColumn.length);
+ if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) {
+ this.valueColumns[filteredColumnIndex] = null;
+ bitMap.markAll();
+ } else {
+ this.valueColumns[filteredColumnIndex] =
+ filterValueColumnsByRowIndexList(
+ originValueColumnTypes[i],
+ originValueColumns[i],
+ rowIndexList,
+ false,
+ originBitMapList[i],
+ bitMap);
+ }
+ this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
}
}
- rowCount = tablet.rowSize;
- if (rowCount == 0 && LOGGER.isDebugEnabled()) {
+ this.rowCount = this.timestampColumn.length;
+ if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Tablet({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
tablet,
pattern,
- sourceEvent.getStartTime(),
- sourceEvent.getEndTime(),
- sourceEvent);
+ this.sourceEvent.getStartTime(),
+ this.sourceEvent.getEndTime(),
+ this.sourceEvent);
}
}
@@ -394,10 +432,12 @@ public class TabletInsertionDataContainer {
}
private static Object filterValueColumnsByRowIndexList(
- TSDataType type,
- Object originValueColumn,
- List<Integer> rowIndexList,
- boolean isSingleOriginValueColumn) {
+ @NonNull TSDataType type,
+ @NonNull Object originValueColumn,
+ @NonNull List<Integer> rowIndexList,
+ boolean isSingleOriginValueColumn,
+ @NonNull BitMap originNullValueColumnBitmap,
+ @NonNull BitMap nullValueColumnBitmap /* output parameters */) {
switch (type) {
case INT32:
{
@@ -407,7 +447,12 @@ public class TabletInsertionDataContainer {
: (int[]) originValueColumn;
int[] valueColumns = new int[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- valueColumns[i] = intValueColumns[rowIndexList.get(i)];
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = intValueColumns[rowIndexList.get(i)];
+ }
}
return valueColumns;
}
@@ -419,7 +464,12 @@ public class TabletInsertionDataContainer {
: (long[]) originValueColumn;
long[] valueColumns = new long[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- valueColumns[i] = longValueColumns[rowIndexList.get(i)];
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0L;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = longValueColumns[rowIndexList.get(i)];
+ }
}
return valueColumns;
}
@@ -431,7 +481,12 @@ public class TabletInsertionDataContainer {
: (float[]) originValueColumn;
float[] valueColumns = new float[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- valueColumns[i] = floatValueColumns[rowIndexList.get(i)];
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0F;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = floatValueColumns[rowIndexList.get(i)];
+ }
}
return valueColumns;
}
@@ -443,7 +498,12 @@ public class TabletInsertionDataContainer {
: (double[]) originValueColumn;
double[] valueColumns = new double[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- valueColumns[i] = doubleValueColumns[rowIndexList.get(i)];
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0D;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = doubleValueColumns[rowIndexList.get(i)];
+ }
}
return valueColumns;
}
@@ -455,7 +515,12 @@ public class TabletInsertionDataContainer {
: (boolean[]) originValueColumn;
boolean[] valueColumns = new boolean[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- valueColumns[i] = booleanValueColumns[rowIndexList.get(i)];
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = false;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = booleanValueColumns[rowIndexList.get(i)];
+ }
}
return valueColumns;
}
@@ -467,7 +532,14 @@ public class TabletInsertionDataContainer {
: (Binary[]) originValueColumn;
Binary[] valueColumns = new Binary[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- valueColumns[i] = new Binary(binaryValueColumns[rowIndexList.get(i)].getValues());
+ if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)])
+ || Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues())
+ || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = Binary.EMPTY_VALUE;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = new Binary(binaryValueColumns[rowIndexList.get(i)].getValues());
+ }
}
return valueColumns;
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 46543ceabf0..011d26f2ff0 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -75,18 +75,18 @@ public class PipeTabletInsertionEventTest {
}
private void createMeasurementSchema() {
- for (int i = 0; i < 6; i++) {
+ for (int i = 0; i < schemas.length; i++) {
schemas[i] = new MeasurementSchema(measurementIds[i], dataTypes[i]);
}
}
private void createInsertRowNode() throws IllegalPathException {
- final Object[] values = new Object[6];
+ final Object[] values = new Object[schemas.length];
values[0] = 100;
values[1] = 10000L;
values[2] = 2F;
- values[3] = 1.0;
+ values[3] = 1D;
values[4] = false;
values[5] = BytesUtils.valueOf("text");
@@ -116,20 +116,20 @@ public class PipeTabletInsertionEventTest {
}
private void createInsertTabletNode() throws IllegalPathException {
- final Object[] values = new Object[6];
+ final Object[] values = new Object[schemas.length];
- values[0] = new int[5];
- values[1] = new long[5];
- values[2] = new float[5];
- values[3] = new double[5];
- values[4] = new boolean[5];
- values[5] = new Binary[5];
+ values[0] = new int[times.length];
+ values[1] = new long[times.length];
+ values[2] = new float[times.length];
+ values[3] = new double[times.length];
+ values[4] = new boolean[times.length];
+ values[5] = new Binary[times.length];
- for (int r = 0; r < 5; r++) {
+ for (int r = 0; r < times.length; r++) {
((int[]) values[0])[r] = 100;
- ((long[]) values[1])[r] = 10000;
- ((float[]) values[2])[r] = 2;
- ((double[]) values[3])[r] = 1.0;
+ ((long[]) values[1])[r] = 10000L;
+ ((float[]) values[2])[r] = 2F;
+ ((double[]) values[3])[r] = 1D;
((boolean[]) values[4])[r] = false;
((Binary[]) values[5])[r] = BytesUtils.valueOf("text");
}
@@ -162,11 +162,11 @@ public class PipeTabletInsertionEventTest {
}
private void createTablet() {
- final Object[] values = new Object[6];
+ final Object[] values = new Object[schemas.length];
// create tablet for insertRowNode
- BitMap[] bitMapsForInsertRowNode = new BitMap[6];
- for (int i = 0; i < 6; i++) {
+ BitMap[] bitMapsForInsertRowNode = new BitMap[schemas.length];
+ for (int i = 0; i < schemas.length; i++) {
bitMapsForInsertRowNode[i] = new BitMap(1);
}
@@ -179,9 +179,9 @@ public class PipeTabletInsertionEventTest {
for (int r = 0; r < 1; r++) {
((int[]) values[0])[r] = 100;
- ((long[]) values[1])[r] = 10000;
- ((float[]) values[2])[r] = 2;
- ((double[]) values[3])[r] = 1.0;
+ ((long[]) values[1])[r] = 10000L;
+ ((float[]) values[2])[r] = 2F;
+ ((double[]) values[3])[r] = 1D;
((boolean[]) values[4])[r] = false;
((Binary[]) values[5])[r] = BytesUtils.valueOf("text");
}
@@ -193,26 +193,27 @@ public class PipeTabletInsertionEventTest {
tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode;
// create tablet for insertTabletNode
- BitMap[] bitMapsForInsertTabletNode = new BitMap[6];
- for (int i = 0; i < 6; i++) {
+ BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length];
+ for (int i = 0; i < schemas.length; i++) {
bitMapsForInsertTabletNode[i] = new BitMap(times.length);
}
- values[0] = new int[5];
- values[1] = new long[5];
- values[2] = new float[5];
- values[3] = new double[5];
- values[4] = new boolean[5];
- values[5] = new Binary[5];
+ values[0] = new int[times.length];
+ values[1] = new long[times.length];
+ values[2] = new float[times.length];
+ values[3] = new double[times.length];
+ values[4] = new boolean[times.length];
+ values[5] = new Binary[times.length];
- for (int r = 0; r < 5; r++) {
+ for (int r = 0; r < times.length; r++) {
((int[]) values[0])[r] = 100;
- ((long[]) values[1])[r] = 10000;
- ((float[]) values[2])[r] = 2;
- ((double[]) values[3])[r] = 1.0;
+ ((long[]) values[1])[r] = 10000L;
+ ((float[]) values[2])[r] = 2F;
+ ((double[]) values[3])[r] = 1D;
((boolean[]) values[4])[r] = false;
((Binary[]) values[5])[r] = BytesUtils.valueOf("text");
}
+
tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas), times.length);
tabletForInsertTabletNode.values = values;
tabletForInsertTabletNode.timestamps = times;
@@ -294,13 +295,35 @@ public class PipeTabletInsertionEventTest {
TabletInsertionDataContainer container2 =
new TabletInsertionDataContainer(
null,
- new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 113L),
- insertTabletNode,
+ new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L, 110L),
+ insertRowNode,
pattern);
Tablet tablet2 = container2.convertToTablet();
- Assert.assertEquals(3, tablet2.rowSize);
+ Assert.assertEquals(1, tablet2.rowSize);
boolean isAligned2 = container2.isAligned();
Assert.assertFalse(isAligned2);
+
+ TabletInsertionDataContainer container3 =
+ new TabletInsertionDataContainer(
+ null,
+ new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 113L),
+ insertTabletNode,
+ pattern);
+ Tablet tablet3 = container3.convertToTablet();
+ Assert.assertEquals(3, tablet3.rowSize);
+ boolean isAligned3 = container3.isAligned();
+ Assert.assertFalse(isAligned3);
+
+ TabletInsertionDataContainer container4 =
+ new TabletInsertionDataContainer(
+ null,
+ new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, Long.MIN_VALUE, 109L),
+ insertTabletNode,
+ pattern);
+ Tablet tablet4 = container4.convertToTablet();
+ Assert.assertEquals(0, tablet4.rowSize);
+ boolean isAligned4 = container4.isAligned();
+ Assert.assertFalse(isAligned4);
}
@Test