You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/11 10:11:45 UTC
[iotdb] 01/01: fix level compaction loss data
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_level_compaction_loss_data
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64d676df004f5e67b70f1aac5108ca9b1c73b067
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 11 18:10:42 2021 +0800
fix level compaction loss data
---
.../engine/compaction/utils/CompactionUtils.java | 5 ++
.../compaction/LevelCompactionMergeTest.java | 68 ++++++++++++++++++++++
.../db/engine/compaction/LevelCompactionTest.java | 59 +++++++++++++++++++
3 files changed, 132 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index aaeaf81..4fafa9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -324,6 +324,11 @@ public class CompactionUtils {
allSensors.addAll(sensorChunkMetadataListMap.keySet());
}
+ // if there is no more chunkMetaData, merge all the sensors
+ if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+ lastSensor = Collections.max(allSensors);
+ }
+
for (String sensor : allSensors) {
if (sensor.compareTo(lastSensor) <= 0) {
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 579fd88..a71ef40 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -304,4 +304,72 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
+
+ @Test
+ public void testCompactionDiffTimeSeries()
+ throws IOException, WriteProcessException, IllegalPathException {
+ int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
+ int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
+ IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
+ IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
+ List<TsFileResource> compactionFiles = prepareTsFileResources();
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ levelCompactionTsFileManagement.addAll(compactionFiles, true);
+ QueryContext context = new QueryContext();
+ PartialPath path =
+ new PartialPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[1].getMeasurementId());
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[1].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ count++;
+ }
+ }
+ assertEquals(count, 1);
+
+ levelCompactionTsFileManagement.forkCurrentFileList(0);
+ CompactionMergeTask compactionOnePartitionUtil =
+ levelCompactionTsFileManagement
+ .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+ compactionMergeWorking = true;
+ compactionOnePartitionUtil.call();
+ while (compactionMergeWorking) {
+ // wait
+ }
+ context = new QueryContext();
+ tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[1].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ count++;
+ }
+ }
+ assertEquals(count, 1);
+ IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
+ IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index 94d496d..55c56b9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -231,4 +231,63 @@ abstract class LevelCompactionTest {
}
fileWriter.close();
}
+
+ List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException {
+ List<TsFileResource> ret = new ArrayList<>();
+ // prepare file 1
+ File file1 =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ System.nanoTime()
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource1 = new TsFileResource(file1);
+ tsFileResource1.setClosed(true);
+ tsFileResource1.updatePlanIndexes((long) 0);
+ TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
+ fileWriter1.registerTimeseries(
+ new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]);
+ TSRecord record1 = new TSRecord(0, deviceIds[0]);
+ record1.addTuple(
+ DataPoint.getDataPoint(
+ measurementSchemas[0].getType(),
+ measurementSchemas[0].getMeasurementId(),
+ String.valueOf(0)));
+ fileWriter1.write(record1);
+ fileWriter1.close();
+ // prepare file 2
+ File file2 =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ System.nanoTime()
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource2 = new TsFileResource(file2);
+ tsFileResource2.setClosed(true);
+ tsFileResource2.updatePlanIndexes((long) 1);
+ TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
+ fileWriter2.registerTimeseries(
+ new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]);
+ TSRecord record2 = new TSRecord(0, deviceIds[0]);
+ record2.addTuple(
+ DataPoint.getDataPoint(
+ measurementSchemas[1].getType(),
+ measurementSchemas[1].getMeasurementId(),
+ String.valueOf(0)));
+ fileWriter2.write(record2);
+ fileWriter2.close();
+ ret.add(tsFileResource1);
+ ret.add(tsFileResource2);
+ return ret;
+ }
}