You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/18 04:00:09 UTC
[iotdb] 01/02: Add UTs for new insert framework and fix insertRowNode memControl
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch dataRegionTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 138261028d07f2cecc96641286785c2b6574f70c
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Apr 18 11:59:03 2022 +0800
Add UTs for new insert framework and fix insertRowNode memControl
---
.../db/engine/storagegroup/TsFileProcessor.java | 214 +++--
.../db/engine/storagegroup/DataRegionTest.java | 910 +++++++++++++++++++++
.../engine/storagegroup/TsFileProcessorTest.java | 18 +
...ocessorTest.java => TsFileProcessorV2Test.java} | 104 ++-
4 files changed, 1086 insertions(+), 160 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9d5e0d323d..911eda092f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -223,9 +223,15 @@ public class TsFileProcessor {
long[] memIncrements = null;
if (enableMemControl) {
if (insertRowPlan.isAligned()) {
- memIncrements = checkAlignedMemCostAndAddToTspInfo(insertRowPlan);
+ memIncrements =
+ checkAlignedMemCostAndAddToTspInfo(
+ insertRowPlan.getDevicePath().getFullPath(), insertRowPlan.getDataTypes(),
+ insertRowPlan.getMeasurements(), insertRowPlan.getValues());
} else {
- memIncrements = checkMemCostAndAddToTspInfo(insertRowPlan);
+ memIncrements =
+ checkMemCostAndAddToTspInfo(
+ insertRowPlan.getDevicePath().getFullPath(), insertRowPlan.getDataTypes(),
+ insertRowPlan.getMeasurements(), insertRowPlan.getValues());
}
}
@@ -277,9 +283,15 @@ public class TsFileProcessor {
long[] memIncrements = null;
if (enableMemControl) {
if (insertRowNode.isAligned()) {
- // memIncrements = checkAlignedMemCostAndAddToTspInfo(insertRowNode);
+ memIncrements =
+ checkAlignedMemCostAndAddToTspInfo(
+ insertRowNode.getDevicePath().getFullPath(), insertRowNode.getDataTypes(),
+ insertRowNode.getMeasurements(), insertRowNode.getValues());
} else {
- // memIncrements = checkMemCostAndAddToTspInfo(insertRowNode);
+ memIncrements =
+ checkMemCostAndAddToTspInfo(
+ insertRowNode.getDevicePath().getFullPath(), insertRowNode.getDataTypes(),
+ insertRowNode.getMeasurements(), insertRowNode.getValues());
}
}
@@ -339,9 +351,23 @@ public class TsFileProcessor {
try {
if (enableMemControl) {
if (insertTabletPlan.isAligned()) {
- memIncrements = checkAlignedMemCostAndAddToTsp(insertTabletPlan, start, end);
+ memIncrements =
+ checkAlignedMemCostAndAddToTsp(
+ insertTabletPlan.getDevicePath().getFullPath(),
+ insertTabletPlan.getDataTypes(),
+ insertTabletPlan.getMeasurements(),
+ insertTabletPlan.getColumns(),
+ start,
+ end);
} else {
- memIncrements = checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
+ memIncrements =
+ checkMemCostAndAddToTspInfo(
+ insertTabletPlan.getDevicePath().getFullPath(),
+ insertTabletPlan.getDataTypes(),
+ insertTabletPlan.getMeasurements(),
+ insertTabletPlan.getColumns(),
+ start,
+ end);
}
}
} catch (WriteProcessException e) {
@@ -422,9 +448,23 @@ public class TsFileProcessor {
try {
if (enableMemControl) {
if (insertTabletNode.isAligned()) {
- memIncrements = checkAlignedMemCostAndAddToTsp(insertTabletNode, start, end);
+ memIncrements =
+ checkAlignedMemCostAndAddToTsp(
+ insertTabletNode.getDevicePath().getFullPath(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getColumns(),
+ start,
+ end);
} else {
- memIncrements = checkMemCostAndAddToTspInfo(insertTabletNode, start, end);
+ memIncrements =
+ checkMemCostAndAddToTspInfo(
+ insertTabletNode.getDevicePath().getFullPath(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getColumns(),
+ start,
+ end);
}
}
} catch (WriteProcessException e) {
@@ -481,43 +521,41 @@ public class TsFileProcessor {
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
- private long[] checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+ private long[] checkMemCostAndAddToTspInfo(
+ String deviceId, TSDataType[] dataTypes, String[] measurements, Object[] values)
throws WriteProcessException {
// memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
// get device id
- IDeviceID deviceID = null;
+ IDeviceID deviceID;
try {
- deviceID = getDeviceID(insertRowPlan.getDevicePath().getFullPath());
+ deviceID = getDeviceID(deviceId);
} catch (IllegalPathException e) {
throw new WriteProcessException(e);
}
- for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+ for (int i = 0; i < dataTypes.length; i++) {
// skip failed Measurements
- if (insertRowPlan.getDataTypes()[i] == null || insertRowPlan.getMeasurements()[i] == null) {
+ if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
- if (workMemTable.checkIfChunkDoesNotExist(deviceID, insertRowPlan.getMeasurements()[i])) {
+ if (workMemTable.checkIfChunkDoesNotExist(deviceID, measurements[i])) {
// ChunkMetadataIncrement
- chunkMetadataIncrement +=
- ChunkMetadata.calculateRamSize(
- insertRowPlan.getMeasurements()[i], insertRowPlan.getDataTypes()[i]);
- memTableIncrement += TVList.tvListArrayMemCost(insertRowPlan.getDataTypes()[i]);
+ chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
+ memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
} else {
// here currentChunkPointNum >= 1
- long currentChunkPointNum =
- workMemTable.getCurrentTVListSize(deviceID, insertRowPlan.getMeasurements()[i]);
+ long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceID, measurements[i]);
memTableIncrement +=
(currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
- ? TVList.tvListArrayMemCost(insertRowPlan.getDataTypes()[i])
+ ? TVList.tvListArrayMemCost(dataTypes[i])
: 0;
}
// TEXT data mem size
- if (insertRowPlan.getDataTypes()[i] == TSDataType.TEXT) {
- textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
+ if (dataTypes[i] == TSDataType.TEXT) {
+ textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
@@ -525,7 +563,8 @@ public class TsFileProcessor {
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
- private long[] checkAlignedMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+ private long[] checkAlignedMemCostAndAddToTspInfo(
+ String deviceId, TSDataType[] dataTypes, String[] measurements, Object[] values)
throws WriteProcessException {
// memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
long memTableIncrement = 0L;
@@ -533,9 +572,9 @@ public class TsFileProcessor {
long chunkMetadataIncrement = 0L;
AlignedWritableMemChunk alignedMemChunk = null;
// get device id
- IDeviceID deviceID = null;
+ IDeviceID deviceID;
try {
- deviceID = getDeviceID(insertRowPlan.getDevicePath().getFullPath());
+ deviceID = getDeviceID(deviceId);
} catch (IllegalPathException e) {
throw new WriteProcessException(e);
}
@@ -544,74 +583,47 @@ public class TsFileProcessor {
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
- * insertRowPlan.getDataTypes().length;
- memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(insertRowPlan.getDataTypes());
+ * dataTypes.length;
+ memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
} else {
// here currentChunkPointNum >= 1
long currentChunkPointNum =
workMemTable.getCurrentTVListSize(deviceID, AlignedPath.VECTOR_PLACEHOLDER);
memTableIncrement +=
(currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
- ? AlignedTVList.alignedTvListArrayMemCost(insertRowPlan.getDataTypes())
+ ? AlignedTVList.alignedTvListArrayMemCost(dataTypes)
: 0;
alignedMemChunk =
((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceID))
.getAlignedMemChunk();
}
- for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
+ for (int i = 0; i < dataTypes.length; i++) {
// skip failed Measurements
- if (insertRowPlan.getDataTypes()[i] == null || insertRowPlan.getMeasurements()[i] == null) {
+ if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
// extending the column of aligned mem chunk
- if (alignedMemChunk != null
- && !alignedMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
+ if (alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) {
memTableIncrement +=
(alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
- * insertRowPlan.getDataTypes()[i].getDataTypeSize();
+ * dataTypes[i].getDataTypeSize();
}
// TEXT data mem size
- if (insertRowPlan.getDataTypes()[i] == TSDataType.TEXT) {
- textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
+ if (dataTypes[i] == TSDataType.TEXT) {
+ textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
}
- private long[] checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end)
- throws WriteProcessException {
- if (start >= end) {
- return new long[] {0, 0, 0};
- }
- long[] memIncrements = new long[3]; // memTable, text, chunk metadata
-
- // get device id
- IDeviceID deviceID = null;
- try {
- deviceID = getDeviceID(insertTabletPlan.getDevicePath().getFullPath());
- } catch (IllegalPathException e) {
- throw new WriteProcessException(e);
- }
-
- for (int i = 0; i < insertTabletPlan.getDataTypes().length; i++) {
- // skip failed Measurements
- TSDataType dataType = insertTabletPlan.getDataTypes()[i];
- String measurement = insertTabletPlan.getMeasurements()[i];
- Object column = insertTabletPlan.getColumns()[i];
- if (dataType == null || column == null || measurement == null) {
- continue;
- }
- updateMemCost(dataType, measurement, deviceID, start, end, memIncrements, column);
- }
- long memTableIncrement = memIncrements[0];
- long textDataIncrement = memIncrements[1];
- long chunkMetadataIncrement = memIncrements[2];
- updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
- return memIncrements;
- }
-
- private long[] checkMemCostAndAddToTspInfo(InsertTabletNode insertTabletNode, int start, int end)
+ private long[] checkMemCostAndAddToTspInfo(
+ String deviceId,
+ TSDataType[] dataTypes,
+ String[] measurements,
+ Object[] columns,
+ int start,
+ int end)
throws WriteProcessException {
if (start >= end) {
return new long[] {0, 0, 0};
@@ -619,53 +631,20 @@ public class TsFileProcessor {
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
// get device id
- IDeviceID deviceID = null;
+ IDeviceID deviceID;
try {
- deviceID = getDeviceID(insertTabletNode.getDevicePath().getFullPath());
+ deviceID = getDeviceID(deviceId);
} catch (IllegalPathException e) {
throw new WriteProcessException(e);
}
- for (int i = 0; i < insertTabletNode.getDataTypes().length; i++) {
+ for (int i = 0; i < dataTypes.length; i++) {
// skip failed Measurements
- TSDataType dataType = insertTabletNode.getDataTypes()[i];
- String measurement = insertTabletNode.getMeasurementSchemas()[i].getMeasurementId();
- Object column = insertTabletNode.getColumns()[i];
- if (dataType == null || column == null || measurement == null) {
+ if (dataTypes[i] == null || columns[i] == null || measurements[i] == null) {
continue;
}
- updateMemCost(dataType, measurement, deviceID, start, end, memIncrements, column);
- }
- long memTableIncrement = memIncrements[0];
- long textDataIncrement = memIncrements[1];
- long chunkMetadataIncrement = memIncrements[2];
- updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
- return memIncrements;
- }
-
- private long[] checkAlignedMemCostAndAddToTsp(
- InsertTabletPlan insertTabletPlan, int start, int end) throws WriteProcessException {
- if (start >= end) {
- return new long[] {0, 0, 0};
+ updateMemCost(dataTypes[i], measurements[i], deviceID, start, end, memIncrements, columns[i]);
}
- long[] memIncrements = new long[3]; // memTable, text, chunk metadata
-
- // get device id
- IDeviceID deviceID = null;
- try {
- deviceID = getDeviceID(insertTabletPlan.getDevicePath().getFullPath());
- } catch (IllegalPathException e) {
- throw new WriteProcessException(e);
- }
-
- updateAlignedMemCost(
- insertTabletPlan.getDataTypes(),
- deviceID,
- insertTabletPlan.getMeasurements(),
- start,
- end,
- memIncrements,
- insertTabletPlan.getColumns());
long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];
@@ -674,28 +653,27 @@ public class TsFileProcessor {
}
private long[] checkAlignedMemCostAndAddToTsp(
- InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException {
+ String deviceId,
+ TSDataType[] dataTypes,
+ String[] measurements,
+ Object[] columns,
+ int start,
+ int end)
+ throws WriteProcessException {
if (start >= end) {
return new long[] {0, 0, 0};
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
// get device id
- IDeviceID deviceID = null;
+ IDeviceID deviceID;
try {
- deviceID = getDeviceID(insertTabletNode.getDevicePath().getFullPath());
+ deviceID = getDeviceID(deviceId);
} catch (IllegalPathException e) {
throw new WriteProcessException(e);
}
- updateAlignedMemCost(
- insertTabletNode.getDataTypes(),
- deviceID,
- insertTabletNode.getMeasurements(),
- start,
- end,
- memIncrements,
- insertTabletNode.getColumns());
+ updateAlignedMemCost(dataTypes, deviceID, measurements, start, end, memIncrements, columns);
long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
new file mode 100644
index 0000000000..a49e05acbd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
+import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
+import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.rescon.MemTableManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataRegionTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final Logger logger = LoggerFactory.getLogger(DataRegionTest.class);
+
+ private String storageGroup = "root.vehicle.d0";
+ private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+ private String deviceId = "root.vehicle.d0";
+ private String measurementId = "s0";
+ private DataRegion dataRegion;
+ private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
+
+ @Before
+ public void setUp() throws Exception {
+ MetadataManagerHelper.initMetadata();
+ EnvironmentUtils.envSetUp();
+ dataRegion = new DummyDataRegion(systemDir, storageGroup);
+ CompactionTaskManager.getInstance().start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dataRegion.syncDeleteDataFiles();
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
+ CompactionTaskManager.getInstance().stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ public static InsertRowNode buildInsertRowNodeByTSRecord(TSRecord record)
+ throws IllegalPathException {
+ String[] measurements = new String[record.dataPointList.size()];
+ MeasurementSchema[] measurementSchemas = new MeasurementSchema[record.dataPointList.size()];
+ TSDataType[] dataTypes = new TSDataType[record.dataPointList.size()];
+ Object[] values = new Object[record.dataPointList.size()];
+ for (int i = 0; i < record.dataPointList.size(); i++) {
+ measurements[i] = record.dataPointList.get(i).getMeasurementId();
+ measurementSchemas[i] =
+ new MeasurementSchema(
+ measurements[i],
+ record.dataPointList.get(i).getType(),
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED);
+ dataTypes[i] = record.dataPointList.get(i).getType();
+ values[i] = record.dataPointList.get(i).getValue();
+ }
+ QueryId queryId = new QueryId("test_write");
+ return new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath(record.deviceId),
+ false,
+ measurementSchemas,
+ dataTypes,
+ record.time,
+ values);
+ }
+
+ @Test
+ public void testUnseqUnsealedDelete()
+ throws WriteProcessException, IOException, MetadataException, TriggerExecutionException {
+ TSRecord record = new TSRecord(10000, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 1; j <= 10; j++) {
+ record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+
+ for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+
+ for (int j = 11; j <= 20; j++) {
+ record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+
+ PartialPath fullPath =
+ new MeasurementPath(
+ deviceId,
+ measurementId,
+ new MeasurementSchema(
+ measurementId,
+ TSDataType.INT32,
+ TSEncoding.RLE,
+ CompressionType.UNCOMPRESSED,
+ Collections.emptyMap()));
+
+ dataRegion.delete(new PartialPath(deviceId, measurementId), 0, 15L, -1, null);
+
+ List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
+ for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.query(
+ Collections.singletonList(fullPath),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsfileResourcesForQuery);
+ }
+
+ Assert.assertEquals(1, tsfileResourcesForQuery.size());
+ List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
+ long time = 16;
+ for (ReadOnlyMemChunk memChunk : memChunks) {
+ IPointReader iterator = memChunk.getPointReader();
+ while (iterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+ Assert.assertEquals(time++, timeValuePair.getTimestamp());
+ }
+ }
+ }
+
+ @Test
+ public void testSequenceSyncClose()
+ throws WriteProcessException, QueryProcessException, IllegalPathException,
+ TriggerExecutionException {
+ for (int j = 1; j <= 10; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+ Assert.assertEquals(10, queryDataSource.getSeqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
+ @Test
+ public void testInsertDataAndRemovePartitionAndInsert()
+ throws WriteProcessException, QueryProcessException, IllegalPathException,
+ TriggerExecutionException {
+ for (int j = 0; j < 10; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ dataRegion.removePartitions((storageGroupName, timePartitionId) -> true);
+
+ for (int j = 0; j < 10; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ }
+
+ @Test
+ public void testIoTDBTabletWriteAndSyncClose()
+ throws QueryProcessException, IllegalPathException, TriggerExecutionException {
+ String[] measurements = new String[2];
+ measurements[0] = "s0";
+ measurements[1] = "s1";
+ TSDataType[] dataTypes = new TSDataType[2];
+ dataTypes[0] = TSDataType.INT32;
+ dataTypes[1] = TSDataType.INT64;
+
+ MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+ measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+ measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+ long[] times = new long[100];
+ Object[] columns = new Object[2];
+ columns[0] = new int[100];
+ columns[1] = new long[100];
+
+ for (int r = 0; r < 100; r++) {
+ times[r] = r;
+ ((int[]) columns[0])[r] = 1;
+ ((long[]) columns[1])[r] = 1;
+ }
+
+ InsertTabletNode insertTabletNode1 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ for (int r = 50; r < 149; r++) {
+ times[r - 50] = r;
+ ((int[]) columns[0])[r - 50] = 1;
+ ((long[]) columns[1])[r - 50] = 1;
+ }
+
+ InsertTabletNode insertTabletNode2 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+
+ Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
+ @Test
+ public void testSeqAndUnSeqSyncClose()
+ throws WriteProcessException, QueryProcessException, IllegalPathException,
+ TriggerExecutionException {
+ for (int j = 21; j <= 30; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 10; j >= 1; j--) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+ Assert.assertEquals(10, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
+ @Test
+ public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
+ throws WriteProcessException, QueryProcessException, IllegalPathException, IOException,
+ TriggerExecutionException {
+ boolean defaultValue = config.isEnableDiscardOutOfOrderData();
+ config.setEnableDiscardOutOfOrderData(true);
+
+ for (int j = 21; j <= 30; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 10; j >= 1; j--) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+ Assert.assertEquals(10, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+
+ config.setEnableDiscardOutOfOrderData(defaultValue);
+ }
+
+ @Test
+ public void testEnableDiscardOutOfOrderDataForInsertTablet1()
+ throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
+ boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+ long defaultTimePartition = config.getPartitionInterval();
+ boolean defaultEnablePartition = config.isEnablePartition();
+ config.setEnableDiscardOutOfOrderData(true);
+ config.setEnablePartition(true);
+ config.setPartitionInterval(100);
+
+ String[] measurements = new String[2];
+ measurements[0] = "s0";
+ measurements[1] = "s1";
+ TSDataType[] dataTypes = new TSDataType[2];
+ dataTypes[0] = TSDataType.INT32;
+ dataTypes[1] = TSDataType.INT64;
+
+ MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+ measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+ measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+ long[] times = new long[100];
+ Object[] columns = new Object[2];
+ columns[0] = new int[100];
+ columns[1] = new long[100];
+
+ for (int r = 0; r < 100; r++) {
+ times[r] = r;
+ ((int[]) columns[0])[r] = 1;
+ ((long[]) columns[1])[r] = 1;
+ }
+ InsertTabletNode insertTabletNode1 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ for (int r = 149; r >= 50; r--) {
+ times[r - 50] = r;
+ ((int[]) columns[0])[r - 50] = 1;
+ ((long[]) columns[1])[r - 50] = 1;
+ }
+ InsertTabletNode insertTabletNode2 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+
+ Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+
+ config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+ config.setPartitionInterval(defaultTimePartition);
+ config.setEnablePartition(defaultEnablePartition);
+ }
+
+ @Test
+ public void testEnableDiscardOutOfOrderDataForInsertTablet2()
+ throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
+ boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+ long defaultTimePartition = config.getPartitionInterval();
+ boolean defaultEnablePartition = config.isEnablePartition();
+ config.setEnableDiscardOutOfOrderData(true);
+ config.setEnablePartition(true);
+ config.setPartitionInterval(1200);
+
+ String[] measurements = new String[2];
+ measurements[0] = "s0";
+ measurements[1] = "s1";
+ TSDataType[] dataTypes = new TSDataType[2];
+ dataTypes[0] = TSDataType.INT32;
+ dataTypes[1] = TSDataType.INT64;
+
+ MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+ measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+ measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+ long[] times = new long[1200];
+ Object[] columns = new Object[2];
+ columns[0] = new int[1200];
+ columns[1] = new long[1200];
+
+ for (int r = 0; r < 1200; r++) {
+ times[r] = r;
+ ((int[]) columns[0])[r] = 1;
+ ((long[]) columns[1])[r] = 1;
+ }
+ InsertTabletNode insertTabletNode1 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ for (int r = 1249; r >= 50; r--) {
+ times[r - 50] = r;
+ ((int[]) columns[0])[r - 50] = 1;
+ ((long[]) columns[1])[r - 50] = 1;
+ }
+ InsertTabletNode insertTabletNode2 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+
+ Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+
+ config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+ config.setPartitionInterval(defaultTimePartition);
+ config.setEnablePartition(defaultEnablePartition);
+ }
+
+ @Test
+ public void testEnableDiscardOutOfOrderDataForInsertTablet3()
+ throws QueryProcessException, IllegalPathException, IOException, TriggerExecutionException {
+ boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+ long defaultTimePartition = config.getPartitionInterval();
+ boolean defaultEnablePartition = config.isEnablePartition();
+ config.setEnableDiscardOutOfOrderData(true);
+ config.setEnablePartition(true);
+ config.setPartitionInterval(1000);
+
+ String[] measurements = new String[2];
+ measurements[0] = "s0";
+ measurements[1] = "s1";
+ TSDataType[] dataTypes = new TSDataType[2];
+ dataTypes[0] = TSDataType.INT32;
+ dataTypes[1] = TSDataType.INT64;
+
+ MeasurementSchema[] measurementSchemas = new MeasurementSchema[2];
+ measurementSchemas[0] = new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN);
+ measurementSchemas[1] = new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN);
+
+ long[] times = new long[1200];
+ Object[] columns = new Object[2];
+ columns[0] = new int[1200];
+ columns[1] = new long[1200];
+
+ for (int r = 0; r < 1200; r++) {
+ times[r] = r;
+ ((int[]) columns[0])[r] = 1;
+ ((long[]) columns[1])[r] = 1;
+ }
+ InsertTabletNode insertTabletNode1 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ for (int r = 1249; r >= 50; r--) {
+ times[r - 50] = r;
+ ((int[]) columns[0])[r - 50] = 1;
+ ((long[]) columns[1])[r - 50] = 1;
+ }
+ InsertTabletNode insertTabletNode2 =
+ new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath("root.vehicle.d0"),
+ false,
+ measurementSchemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
+
+ dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (TsFileProcessor tsfileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+
+ Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+
+ config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+ config.setPartitionInterval(defaultTimePartition);
+ config.setEnablePartition(defaultEnablePartition);
+ }
+
+ @Test
+ public void testMerge()
+ throws WriteProcessException, QueryProcessException, IllegalPathException,
+ TriggerExecutionException {
+ int originCandidateFileNum =
+ IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
+ IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(9);
+ boolean originEnableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ boolean originEnableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
+ for (int j = 21; j <= 30; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 10; j >= 1; j--) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ dataRegion.compact();
+ long totalWaitingTime = 0;
+ while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
+ // wait
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ totalWaitingTime += 100;
+ if (totalWaitingTime % 1000 == 0) {
+ logger.warn("has waited for {} seconds", totalWaitingTime / 1000);
+ }
+ if (totalWaitingTime > 120_000) {
+ Assert.fail();
+ break;
+ }
+ }
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
+ context,
+ null,
+ null);
+ Assert.assertEquals(2, queryDataSource.getSeqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ for (TsFileResource resource : queryDataSource.getUnseqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setMaxInnerCompactionCandidateFileNum(originCandidateFileNum);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableSeqSpaceCompaction(originEnableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(originEnableUnseqSpaceCompaction);
+ }
+
+ @Test
+ public void testDeleteStorageGroupWhenCompacting() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(10);
+ try {
+ for (int j = 0; j < 10; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ dataRegion.getTsFileManager(),
+ dataRegion.getSequenceFileList(),
+ true,
+ new ReadChunkCompactionPerformer(dataRegion.getSequenceFileList()),
+ new AtomicInteger(0));
+ CompactionTaskManager.getInstance().submitTask(task);
+ Thread.sleep(20);
+ StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup));
+ Thread.sleep(500);
+
+ for (TsFileResource resource : dataRegion.getSequenceFileList()) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ }
+ TsFileResource targetTsFileResource =
+ TsFileNameGenerator.getInnerCompactionTargetFileResource(
+ dataRegion.getSequenceFileList(), true);
+ Assert.assertFalse(targetTsFileResource.getTsFile().exists());
+ String dataDirectory = targetTsFileResource.getTsFile().getParent();
+ File logFile =
+ new File(
+ dataDirectory
+ + File.separator
+ + targetTsFileResource.getTsFile().getName()
+ + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
+ Assert.assertFalse(logFile.exists());
+ Assert.assertFalse(IoTDBDescriptor.getInstance().getConfig().isReadOnly());
+ Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
+ } finally {
+ new CompactionConfigRestorer().restoreCompactionConfig();
+ }
+ }
+
+ @Test
+ public void testTimedFlushSeqMemTable()
+ throws IllegalPathException, InterruptedException, WriteProcessException,
+ TriggerExecutionException, ShutdownException {
+ // create one sequence memtable
+ TSRecord record = new TSRecord(10000, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ // change config & reboot timed service
+ boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
+ long preFLushInterval = config.getSeqMemtableFlushInterval();
+ config.setEnableTimedFlushSeqMemtable(true);
+ config.setSeqMemtableFlushInterval(5);
+ StorageEngine.getInstance().rebootTimedService();
+
+ Thread.sleep(500);
+
+ Assert.assertEquals(1, dataRegion.getWorkSequenceTsFileProcessors().size());
+ TsFileProcessor tsFileProcessor =
+ dataRegion.getWorkSequenceTsFileProcessors().iterator().next();
+ FlushManager flushManager = FlushManager.getInstance();
+
+ // flush the sequence memtable
+ dataRegion.timedFlushSeqMemTable();
+
+ // wait until memtable flush task is done
+ int waitCnt = 0;
+ while (tsFileProcessor.getFlushingMemTableSize() != 0
+ || tsFileProcessor.isManagedByFlushManager()
+ || flushManager.getNumberOfPendingTasks() != 0
+ || flushManager.getNumberOfPendingSubTasks() != 0
+ || flushManager.getNumberOfWorkingTasks() != 0
+ || flushManager.getNumberOfWorkingSubTasks() != 0) {
+ Thread.sleep(500);
+ ++waitCnt;
+ if (waitCnt % 10 == 0) {
+ logger.info("already wait {} s", waitCnt / 2);
+ }
+ }
+
+ Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
+ config.setSeqMemtableFlushInterval(preFLushInterval);
+ }
+
+ @Test
+ public void testTimedFlushUnseqMemTable()
+ throws IllegalPathException, InterruptedException, WriteProcessException,
+ TriggerExecutionException, ShutdownException {
+ // create one sequence memtable & close
+ TSRecord record = new TSRecord(10000, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ // create one unsequence memtable
+ record = new TSRecord(1, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ // change config & reboot timed service
+ boolean prevEnableTimedFlushUnseqMemtable = config.isEnableTimedFlushUnseqMemtable();
+ long preFLushInterval = config.getUnseqMemtableFlushInterval();
+ config.setEnableTimedFlushUnseqMemtable(true);
+ config.setUnseqMemtableFlushInterval(5);
+ StorageEngine.getInstance().rebootTimedService();
+
+ Thread.sleep(500);
+
+ Assert.assertEquals(1, dataRegion.getWorkUnsequenceTsFileProcessors().size());
+ TsFileProcessor tsFileProcessor =
+ dataRegion.getWorkUnsequenceTsFileProcessors().iterator().next();
+ FlushManager flushManager = FlushManager.getInstance();
+
+ // flush the unsequence memtable
+ dataRegion.timedFlushUnseqMemTable();
+
+ // wait until memtable flush task is done
+ int waitCnt = 0;
+ while (tsFileProcessor.getFlushingMemTableSize() != 0
+ || tsFileProcessor.isManagedByFlushManager()
+ || flushManager.getNumberOfPendingTasks() != 0
+ || flushManager.getNumberOfPendingSubTasks() != 0
+ || flushManager.getNumberOfWorkingTasks() != 0
+ || flushManager.getNumberOfWorkingSubTasks() != 0) {
+ Thread.sleep(500);
+ ++waitCnt;
+ if (waitCnt % 10 == 0) {
+ logger.info("already wait {} s", waitCnt / 2);
+ }
+ }
+
+ Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushUnseqMemtable);
+ config.setUnseqMemtableFlushInterval(preFLushInterval);
+ }
+
+ static class DummyDataRegion extends DataRegion {
+
+ DummyDataRegion(String systemInfoDir, String storageGroupName) throws DataRegionException {
+ super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), storageGroupName);
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index ef769fb736..196d56a5e1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -64,6 +64,7 @@ import java.util.List;
import java.util.Map;
import static junit.framework.TestCase.assertTrue;
+import static org.apache.iotdb.db.engine.storagegroup.DataRegionTest.buildInsertRowNodeByTSRecord;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -311,6 +312,15 @@ public class TsFileProcessorTest {
Assert.assertEquals(828424, memTable.getTVListsRamCost());
Assert.assertEquals(90000, memTable.getTotalPointsNum());
Assert.assertEquals(720360, memTable.memSize());
+ // Test records
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ Assert.assertEquals(830120, memTable.getTVListsRamCost());
+ Assert.assertEquals(90100, memTable.getTotalPointsNum());
+ Assert.assertEquals(721560, memTable.memSize());
}
@Test
@@ -337,6 +347,14 @@ public class TsFileProcessorTest {
Assert.assertEquals(1656000, memTable.getTVListsRamCost());
Assert.assertEquals(90000, memTable.getTotalPointsNum());
Assert.assertEquals(1440000, memTable.memSize());
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ Assert.assertEquals(1657696, memTable.getTVListsRamCost());
+ Assert.assertEquals(90100, memTable.getTotalPointsNum());
+ Assert.assertEquals(1441200, memTable.memSize());
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
similarity index 84%
copy from server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
copy to server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index ef769fb736..f97bf51030 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -24,16 +24,15 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -45,7 +44,6 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -64,30 +62,35 @@ import java.util.List;
import java.util.Map;
import static junit.framework.TestCase.assertTrue;
+import static org.apache.iotdb.db.engine.storagegroup.DataRegionTest.buildInsertRowNodeByTSRecord;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-public class TsFileProcessorTest {
+public class TsFileProcessorV2Test {
private TsFileProcessor processor;
- private String storageGroup = "root.vehicle";
- private StorageGroupInfo sgInfo = new StorageGroupInfo(null);
- private String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0, 0, 0);
- private String deviceId = "root.vehicle.d0";
- private String measurementId = "s0";
- private TSDataType dataType = TSDataType.INT32;
- private TSEncoding encoding = TSEncoding.RLE;
- private Map<String, String> props = Collections.emptyMap();
+ private final String storageGroup = "root.vehicle";
+ private StorageGroupInfo sgInfo;
+ private final String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0, 0, 0);
+ private final String deviceId = "root.vehicle.d0";
+ private final String measurementId = "s0";
+ private final TSDataType dataType = TSDataType.INT32;
+ private final TSEncoding encoding = TSEncoding.RLE;
+ private final Map<String, String> props = Collections.emptyMap();
private QueryContext context;
- private static Logger logger = LoggerFactory.getLogger(TsFileProcessorTest.class);
+ private final String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+ private static final Logger logger = LoggerFactory.getLogger(TsFileProcessorV2Test.class);
+
+ public TsFileProcessorV2Test() {}
@Before
- public void setUp() {
+ public void setUp() throws DataRegionException {
File file = new File(filePath);
if (!file.getParentFile().exists()) {
Assert.assertTrue(file.getParentFile().mkdirs());
}
EnvironmentUtils.envSetUp();
+ sgInfo = new StorageGroupInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup));
MetadataManagerHelper.initMetadata();
context = EnvironmentUtils.TEST_QUERY_CONTEXT;
}
@@ -127,7 +130,7 @@ public class TsFileProcessorTest {
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
- processor.insert(new InsertRowPlan(record));
+ processor.insert(buildInsertRowNodeByTSRecord(record));
}
// query data in memory
@@ -186,7 +189,7 @@ public class TsFileProcessorTest {
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
- processor.insert(new InsertRowPlan(record));
+ processor.insert(buildInsertRowNodeByTSRecord(record));
}
// query data in memory
@@ -274,7 +277,7 @@ public class TsFileProcessorTest {
for (int i = 1; i <= 10; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
- processor.insert(new InsertRowPlan(record));
+ processor.insert(buildInsertRowNodeByTSRecord(record));
}
processor.asyncFlush();
}
@@ -302,15 +305,25 @@ public class TsFileProcessorTest {
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.sgInfo.initTsFileProcessorInfo(processor);
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
- processor.insertTablet(genInsertTablePlan(0, true), 0, 10, new TSStatus[10]);
+ // Test Tablet
+ processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10]);
IMemTable memTable = processor.getWorkMemTable();
Assert.assertEquals(828424, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTablePlan(100, true), 0, 10, new TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(100, true), 0, 10, new TSStatus[10]);
Assert.assertEquals(828424, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTablePlan(200, true), 0, 10, new TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(200, true), 0, 10, new TSStatus[10]);
Assert.assertEquals(828424, memTable.getTVListsRamCost());
Assert.assertEquals(90000, memTable.getTotalPointsNum());
Assert.assertEquals(720360, memTable.memSize());
+ // Test records
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+ processor.insert(buildInsertRowNodeByTSRecord(record));
+ }
+ Assert.assertEquals(830120, memTable.getTVListsRamCost());
+ Assert.assertEquals(90100, memTable.getTotalPointsNum());
+ Assert.assertEquals(721560, memTable.memSize());
}
@Test
@@ -328,15 +341,25 @@ public class TsFileProcessorTest {
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.sgInfo.initTsFileProcessorInfo(processor);
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
- processor.insertTablet(genInsertTablePlan(0, false), 0, 10, new TSStatus[10]);
+ // Test tablet
+ processor.insertTablet(genInsertTableNode(0, false), 0, 10, new TSStatus[10]);
IMemTable memTable = processor.getWorkMemTable();
Assert.assertEquals(1656000, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTablePlan(100, false), 0, 10, new TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(100, false), 0, 10, new TSStatus[10]);
Assert.assertEquals(1656000, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTablePlan(200, false), 0, 10, new TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(200, false), 0, 10, new TSStatus[10]);
Assert.assertEquals(1656000, memTable.getTVListsRamCost());
Assert.assertEquals(90000, memTable.getTotalPointsNum());
Assert.assertEquals(1440000, memTable.memSize());
+ // Test records
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
+ processor.insert(buildInsertRowNodeByTSRecord(record));
+ }
+ Assert.assertEquals(1657696, memTable.getTVListsRamCost());
+ Assert.assertEquals(90100, memTable.getTotalPointsNum());
+ Assert.assertEquals(1441200, memTable.memSize());
}
@Test
@@ -369,7 +392,7 @@ public class TsFileProcessorTest {
for (int i = 1; i <= 100; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
- processor.insert(new InsertRowPlan(record));
+ processor.insert(buildInsertRowNodeByTSRecord(record));
}
// query data in memory
@@ -414,25 +437,19 @@ public class TsFileProcessorTest {
}
}
- private InsertTabletPlan genInsertTablePlan(long startTime, boolean isAligned)
+ private InsertTabletNode genInsertTableNode(long startTime, boolean isAligned)
throws IllegalPathException {
String deviceId = "root.sg.device5";
String[] measurements = new String[3000];
- List<Integer> dataTypesList = new ArrayList<>();
TSDataType[] dataTypes = new TSDataType[3000];
TSEncoding[] encodings = new TSEncoding[3000];
- IMeasurementMNode[] mNodes = new IMeasurementMNode[3000];
+ MeasurementSchema[] schemas = new MeasurementSchema[3000];
for (int i = 0; i < 3000; i++) {
measurements[i] = "s" + i;
- dataTypesList.add(TSDataType.INT64.ordinal());
dataTypes[i] = TSDataType.INT64;
encodings[i] = TSEncoding.PLAIN;
- IMeasurementSchema schema =
- new MeasurementSchema(measurements[i], dataTypes[i], encodings[i]);
- mNodes[i] = MeasurementMNode.getMeasurementMNode(null, measurements[i], schema, null);
+ schemas[i] = new MeasurementSchema(measurements[i], dataTypes[i], encodings[i]);
}
- InsertTabletPlan insertTabletPlan =
- new InsertTabletPlan(new PartialPath(deviceId), measurements, dataTypesList);
long[] times = new long[10];
Object[] columns = new Object[3000];
@@ -446,12 +463,15 @@ public class TsFileProcessorTest {
((long[]) columns[i])[(int) r] = r;
}
}
- insertTabletPlan.setTimes(times);
- insertTabletPlan.setColumns(columns);
- insertTabletPlan.setRowCount(times.length);
- insertTabletPlan.setMeasurementMNodes(mNodes);
- insertTabletPlan.setAligned(isAligned);
-
- return insertTabletPlan;
+ return new InsertTabletNode(
+ new QueryId("test_write").genPlanNodeId(),
+ new PartialPath(deviceId),
+ isAligned,
+ schemas,
+ dataTypes,
+ times,
+ null,
+ columns,
+ times.length);
}
}