You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2024/03/22 13:54:00 UTC
(iotdb) branch master updated: Allow series data type not consistent in compaction
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9feffb1510d Allow series data type not consistent in compaction
9feffb1510d is described below
commit 9feffb1510d3b4b2d0b5a21cc9f3ef811fe619aa
Author: shuwenwei <55...@users.noreply.github.com>
AuthorDate: Fri Mar 22 21:53:55 2024 +0800
Allow series data type not consistent in compaction
---
.../impl/ReadChunkCompactionPerformer.java | 47 +++++
.../execute/utils/MultiTsFileDeviceIterator.java | 16 +-
.../fast/AlignedSeriesCompactionExecutor.java | 22 ++-
.../ReadChunkAlignedSeriesCompactionExecutor.java | 13 +-
.../compaction/io/CompactionTsFileReader.java | 51 +++++
.../compaction/CompactionDataTypeNotMatchTest.java | 212 +++++++++++++++++++++
...nkCompactionPerformerWithAlignedSeriesTest.java | 2 +-
7 files changed, 354 insertions(+), 9 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index dd8ad9b00e5..569d97910fc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -184,6 +185,9 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
// dead-loop.
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
+ // remove the chunk metadata whose data type not match the data type of last chunk
+ readerAndChunkMetadataList =
+ filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
new SingleSeriesCompactionExecutor(
p, readerAndChunkMetadataList, writer, targetResource, summary);
@@ -192,6 +196,49 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
writer.endChunkGroup();
}
+ private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+ filterDataTypeNotMatchedChunkMetadata(
+ LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList) {
+ if (readerAndChunkMetadataList.isEmpty()) {
+ return readerAndChunkMetadataList;
+ }
+ LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> result = new LinkedList<>();
+ // find correct data type
+ TSDataType correctDataType = null;
+ for (int i = readerAndChunkMetadataList.size() - 1; i >= 0 && correctDataType == null; i--) {
+ List<ChunkMetadata> chunkMetadataList = readerAndChunkMetadataList.get(i).getRight();
+ if (chunkMetadataList == null || chunkMetadataList.isEmpty()) {
+ continue;
+ }
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ if (chunkMetadata == null) {
+ continue;
+ }
+ correctDataType = chunkMetadata.getDataType();
+ break;
+ }
+ }
+ if (correctDataType == null) {
+ return readerAndChunkMetadataList;
+ }
+ // check data type consistent and skip compact files with wrong data type
+ for (Pair<TsFileSequenceReader, List<ChunkMetadata>> tsFileSequenceReaderListPair :
+ readerAndChunkMetadataList) {
+ boolean dataTypeConsistent = true;
+ for (ChunkMetadata chunkMetadata : tsFileSequenceReaderListPair.getRight()) {
+ if (chunkMetadata != null && chunkMetadata.getDataType() != correctDataType) {
+ dataTypeConsistent = false;
+ break;
+ }
+ }
+ if (!dataTypeConsistent) {
+ continue;
+ }
+ result.add(tsFileSequenceReaderListPair);
+ }
+ return result;
+ }
+
@Override
public void setSourceFiles(List<TsFileResource> seqFiles) {
this.seqFiles = seqFiles;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index d80ba2fab37..c6241c99a9c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -247,6 +248,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
getTimeseriesMetadataOffsetOfCurrentDevice() throws IOException {
Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
new HashMap<>();
+ Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
for (TsFileResource resource : tsFileResourcesSortedByDesc) {
if (!deviceIteratorMap.containsKey(resource)
|| !deviceIteratorMap.get(resource).current().equals(currentDevice)) {
@@ -255,14 +257,22 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
continue;
}
TsFileSequenceReader reader = readerMap.get(resource);
- for (Map.Entry<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> entrySet :
- reader
- .getTimeseriesMetadataOffsetByDevice(
+ for (Map.Entry<String, Pair<TimeseriesMetadata, Pair<Long, Long>>> entrySet :
+ ((CompactionTsFileReader) reader)
+ .getTimeseriesMetadataAndOffsetByDevice(
deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(),
Collections.emptySet(),
false)
.entrySet()) {
String measurementId = entrySet.getKey();
+ // skip the TimeseriesMetadata whose data type is not consistent
+ TSDataType dataTypeOfCurrentTimeseriesMetadata = entrySet.getValue().left.getTsDataType();
+ TSDataType correctDataTypeOfCurrentMeasurement =
+ measurementDataTypeMap.putIfAbsent(measurementId, dataTypeOfCurrentTimeseriesMetadata);
+ if (correctDataTypeOfCurrentMeasurement != null
+ && correctDataTypeOfCurrentMeasurement != dataTypeOfCurrentTimeseriesMetadata) {
+ continue;
+ }
timeseriesMetadataOffsetMap.putIfAbsent(measurementId, new HashMap<>());
timeseriesMetadataOffsetMap.get(measurementId).put(resource, entrySet.getValue().right);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index d3a26201787..9cfd7cee948 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -169,11 +169,16 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
valueChunkMetadatas.add(null);
} else {
// current file contains this aligned timeseries
- valueChunkMetadatas.add(
+ List<IChunkMetadata> valueColumnChunkMetadataList =
readerCacheMap
.get(resource)
.getChunkMetadataListByTimeseriesMetadataOffset(
- timeseriesOffsetInCurrentFile.left, timeseriesOffsetInCurrentFile.right));
+ timeseriesOffsetInCurrentFile.left, timeseriesOffsetInCurrentFile.right);
+ if (isValueChunkDataTypeMatchSchema(valueColumnChunkMetadataList)) {
+ valueChunkMetadatas.add(valueColumnChunkMetadataList);
+ } else {
+ valueChunkMetadatas.add(null);
+ }
}
}
}
@@ -239,6 +244,19 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor {
}
}
+ private boolean isValueChunkDataTypeMatchSchema(
+ List<IChunkMetadata> chunkMetadataListOfOneValueColumn) {
+ for (IChunkMetadata chunkMetadata : chunkMetadataListOfOneValueColumn) {
+ if (chunkMetadata == null) {
+ continue;
+ }
+ String measurement = chunkMetadata.getMeasurementUid();
+ IMeasurementSchema schema = measurementSchemaMap.get(measurement);
+ return schema.getType() == chunkMetadata.getDataType();
+ }
+ return true;
+ }
+
/**
* Deserialize chunk into pages without uncompressing and put them into the page queue.
*
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
index 29768e6ff5e..d5f0785980d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
@@ -103,8 +103,9 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
private void collectValueColumnSchemaList() throws IOException {
Map<String, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
- for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair :
- this.readerAndChunkMetadataList) {
+ for (int i = this.readerAndChunkMetadataList.size() - 1; i >= 0; i--) {
+ Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair =
+ this.readerAndChunkMetadataList.get(i);
CompactionTsFileReader reader = (CompactionTsFileReader) pair.getLeft();
List<AlignedChunkMetadata> alignedChunkMetadataList = pair.getRight();
for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) {
@@ -184,7 +185,7 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
Collections.fill(valueChunks, getChunkLoader(reader, null));
long pointNum = 0;
for (IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) {
- if (chunkMetadata == null) {
+ if (chunkMetadata == null || !isValueChunkDataTypeMatchSchema(chunkMetadata)) {
continue;
}
pointNum += chunkMetadata.getStatistics().getCount();
@@ -201,6 +202,12 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
}
}
+ private boolean isValueChunkDataTypeMatchSchema(IChunkMetadata valueChunkMetadata) {
+ String measurement = valueChunkMetadata.getMeasurementUid();
+ IMeasurementSchema schema = schemaList.get(measurementSchemaListIndexMap.get(measurement));
+ return schema.getType() == valueChunkMetadata.getDataType();
+ }
+
private ChunkLoader getChunkLoader(TsFileSequenceReader reader, ChunkMetadata chunkMetadata)
throws IOException {
if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() == 0) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 476c8d19c89..f4231d378de 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -22,12 +22,14 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.tsfile.file.IMetadataIndexEntry;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
@@ -36,6 +38,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -182,6 +185,54 @@ public class CompactionTsFileReader extends TsFileSequenceReader {
return result;
}
+ public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
+ getTimeseriesMetadataAndOffsetByDevice(
+ MetadataIndexNode measurementNode,
+ Set<String> excludedMeasurementIds,
+ boolean needChunkMetadata)
+ throws IOException {
+ long before = readDataSize.get();
+ Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
+ new LinkedHashMap<>();
+ List<IMetadataIndexEntry> childrenEntryList = measurementNode.getChildren();
+ for (int i = 0; i < childrenEntryList.size(); i++) {
+ long startOffset = childrenEntryList.get(i).getOffset();
+ long endOffset =
+ i == childrenEntryList.size() - 1
+ ? measurementNode.getEndOffset()
+ : childrenEntryList.get(i + 1).getOffset();
+ ByteBuffer nextBuffer = readData(startOffset, endOffset);
+ if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ // leaf measurement node
+ while (nextBuffer.hasRemaining()) {
+ int metadataStartOffset = nextBuffer.position();
+ TimeseriesMetadata timeseriesMetadata =
+ TimeseriesMetadata.deserializeFrom(
+ nextBuffer, excludedMeasurementIds, needChunkMetadata);
+ timeseriesMetadataOffsetMap.put(
+ timeseriesMetadata.getMeasurementId(),
+ new Pair<>(
+ timeseriesMetadata,
+ new Pair<>(
+ startOffset + metadataStartOffset, startOffset + nextBuffer.position())));
+ }
+
+ } else {
+ // internal measurement node
+ MetadataIndexNode nextLayerMeasurementNode =
+ MetadataIndexNode.deserializeFrom(nextBuffer, false);
+ timeseriesMetadataOffsetMap.putAll(
+ getTimeseriesMetadataAndOffsetByDevice(
+ nextLayerMeasurementNode, excludedMeasurementIds, needChunkMetadata));
+ }
+ }
+
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize);
+ return timeseriesMetadataOffsetMap;
+ }
+
@Override
public void getDeviceTimeseriesMetadata(
List<TimeseriesMetadata> timeseriesMetadataList,
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java
new file mode 100644
index 00000000000..c07616a4ce5
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompactionDataTypeNotMatchTest extends AbstractCompactionTest {
+ private final String oldThreadName = Thread.currentThread().getName();
+ private final IDeviceID device = new PlainDeviceID(COMPACTION_TEST_SG + ".d1");
+
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException, InterruptedException {
+ super.setUp();
+ Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ Thread.currentThread().setName(oldThreadName);
+ }
+
+ @Test
+ public void testCompactNonAlignedSeriesWithReadChunkCompactionPerformer()
+ throws IOException, WriteProcessException {
+ generateDataTypeNotMatchFilesWithNonAlignedSeries();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
+ Assert.assertTrue(task.start());
+ TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+ Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+ }
+
+ @Test
+ public void testCompactNonAlignedSeriesWithFastCompactionPerformer()
+ throws IOException, WriteProcessException {
+ generateDataTypeNotMatchFilesWithNonAlignedSeries();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0);
+ Assert.assertTrue(task.start());
+ TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+ Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+ }
+
+ @Test
+ public void testCompactNonAlignedSeriesWithReadPointCompactionPerformer()
+ throws IOException, WriteProcessException {
+ generateDataTypeNotMatchFilesWithNonAlignedSeries();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0);
+ Assert.assertTrue(task.start());
+ TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+ Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+ }
+
+ @Test
+ public void testCompactAlignedSeriesWithReadChunkCompactionPerformer()
+ throws IOException, WriteProcessException {
+ generateDataTypeNotMatchFilesWithAlignedSeries();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
+ Assert.assertTrue(task.start());
+ TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+ Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+ }
+
+ @Test
+ public void testCompactAlignedSeriesWithFastCompactionPerformer()
+ throws IOException, WriteProcessException {
+ generateDataTypeNotMatchFilesWithAlignedSeries();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0);
+ Assert.assertTrue(task.start());
+ TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+ Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+ }
+
+ @Test
+ public void testCompactAlignedSeriesWithReadPointCompactionPerformer()
+ throws IOException, WriteProcessException {
+ generateDataTypeNotMatchFilesWithAlignedSeries();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0);
+ Assert.assertTrue(task.start());
+ TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0));
+ Assert.assertEquals(2, tsFileManager.getTsFileList(true).get(0).getStartTime(device));
+ }
+
+ private void generateDataTypeNotMatchFilesWithNonAlignedSeries()
+ throws IOException, WriteProcessException {
+ MeasurementSchema measurementSchema1 = new MeasurementSchema("s1", TSDataType.BOOLEAN);
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
+ try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) {
+ writer.registerTimeseries(new Path(device), measurementSchema1);
+ TSRecord record = new TSRecord(1, device);
+ record.addTuple(new BooleanDataPoint("s1", true));
+ writer.write(record);
+ writer.flushAllChunkGroups();
+ }
+ resource1.updateStartTime(device, 1);
+ resource1.updateEndTime(device, 1);
+ resource1.serialize();
+ seqResources.add(resource1);
+
+ MeasurementSchema measurementSchema2 = new MeasurementSchema("s1", TSDataType.INT32);
+ TsFileResource resource2 = createEmptyFileAndResource(true);
+ resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
+ try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) {
+ writer.registerTimeseries(new Path(device), measurementSchema2);
+ TSRecord record = new TSRecord(2, device);
+ record.addTuple(new IntDataPoint("s1", 10));
+ writer.write(record);
+ writer.flushAllChunkGroups();
+ }
+ resource2.updateStartTime(device, 2);
+ resource2.updateEndTime(device, 2);
+ resource2.serialize();
+ seqResources.add(resource2);
+ }
+
+ private void generateDataTypeNotMatchFilesWithAlignedSeries()
+ throws IOException, WriteProcessException {
+ List<MeasurementSchema> measurementSchemas1 = new ArrayList<>();
+ measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32));
+ measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32));
+
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
+ try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) {
+ writer.registerAlignedTimeseries(new Path(device), measurementSchemas1);
+ TSRecord record = new TSRecord(1, device);
+ record.addTuple(new IntDataPoint("s1", 0));
+ record.addTuple(new IntDataPoint("s2", 1));
+ writer.writeAligned(record);
+ writer.flushAllChunkGroups();
+ }
+ resource1.updateStartTime(device, 1);
+ resource1.updateEndTime(device, 1);
+ resource1.serialize();
+ seqResources.add(resource1);
+
+ List<MeasurementSchema> measurementSchemas2 = new ArrayList<>();
+ measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.BOOLEAN));
+ measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.BOOLEAN));
+ TsFileResource resource2 = createEmptyFileAndResource(true);
+ resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
+ try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) {
+ writer.registerAlignedTimeseries(new Path(device), measurementSchemas2);
+ TSRecord record = new TSRecord(2, device);
+ record.addTuple(new BooleanDataPoint("s1", true));
+ record.addTuple(new BooleanDataPoint("s2", true));
+ writer.writeAligned(record);
+ writer.flushAllChunkGroups();
+ }
+ resource2.updateStartTime(device, 2);
+ resource2.updateEndTime(device, 2);
+ resource2.serialize();
+ seqResources.add(resource2);
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
index 5b840c8dad8..8c925424b38 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
@@ -625,7 +625,7 @@ public class NewReadChunkCompactionPerformerWithAlignedSeriesTest extends Abstra
performer.perform();
CompactionUtils.moveTargetFile(
Collections.singletonList(targetResource), true, COMPACTION_TEST_SG);
- Assert.assertEquals(8, summary.getDirectlyFlushChunkNum());
+ Assert.assertEquals(16, summary.getDirectlyFlushChunkNum());
Assert.assertEquals(0, summary.getDirectlyFlushPageCount());
TsFileResourceUtils.validateTsFileDataCorrectness(targetResource);
Assert.assertEquals(