You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/06/10 17:20:32 UTC
[iotdb] 01/01: fix bug and add UT
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch fix_compaction_loss_data
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 31e4257c93040856fc0fb034381db8e1320f594a
Author: EJTTianyu <16...@qq.com>
AuthorDate: Fri Jun 11 01:19:49 2021 +0800
fix bug and add UT
---
.../engine/compaction/utils/CompactionUtils.java | 5 ++-
.../compaction/LevelCompactionMergeTest.java | 50 ++++++++++++++++++++++
.../db/engine/compaction/LevelCompactionTest.java | 43 +++++++++++++++++++
3 files changed, 97 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 822b149..e5ea202 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -318,7 +318,10 @@ public class CompactionUtils {
// get all sensor used later
allSensors.addAll(sensorChunkMetadataListMap.keySet());
}
-
+ // if there is no more chunkMetaData, merge all the sensors
+ if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+ lastSensor = Collections.max(allSensors);
+ }
for (String sensor : allSensors) {
if (sensor.compareTo(lastSensor) <= 0) {
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index f687957..6656993 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -66,6 +66,56 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
FileUtils.deleteDirectory(tempSGDir);
}
+ @Test
+ public void testCompactionDiffTimeSeries()
+ throws IOException, WriteProcessException, IllegalPathException {
+ int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
+ int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
+ IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
+ IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
+ List<TsFileResource> compactionFiles = prepareTsFileResources();
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement = new LevelCompactionTsFileManagement(
+ COMPACTION_TEST_SG, tempSGDir.getPath());
+ levelCompactionTsFileManagement.addAll(compactionFiles, true);
+ QueryContext context = new QueryContext();
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[1].getMeasurementId());
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ count++;
+ }
+ }
+ assertEquals(count, 1);
+
+ levelCompactionTsFileManagement.forkCurrentFileList(0);
+ CompactionOnePartitionUtil compactionOnePartitionUtil = levelCompactionTsFileManagement.new CompactionOnePartitionUtil(
+ this::closeCompactionMergeCallBack, 0);
+ compactionMergeWorking = true;
+ compactionOnePartitionUtil.run();
+ while (compactionMergeWorking) {
+ //wait
+ }
+ context = new QueryContext();
+ tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true);
+ count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ count++;
+ }
+ }
+ assertEquals(count, 1);
+ IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
+ IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
+ }
+
/**
* just compaction once
*/
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index dc71ea4..dfa69d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -204,4 +204,47 @@ abstract class LevelCompactionTest {
fileWriter.close();
}
+ List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException {
+ List<TsFileResource> ret = new ArrayList<>();
+ // prepare file 1
+ File file1 = new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource1 = new TsFileResource(file1);
+ tsFileResource1.setClosed(true);
+ tsFileResource1.updatePlanIndexes((long) 0);
+ TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
+ fileWriter1.registerTimeseries(
+ new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]);
+ TSRecord record1 = new TSRecord(0, deviceIds[0]);
+ record1.addTuple(DataPoint
+ .getDataPoint(measurementSchemas[0].getType(), measurementSchemas[0].getMeasurementId(),
+ String.valueOf(0)));
+ fileWriter1.write(record1);
+ fileWriter1.close();
+ // prepare file 2
+ File file2 = new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource2 = new TsFileResource(file2);
+ tsFileResource2.setClosed(true);
+ tsFileResource2.updatePlanIndexes((long) 1);
+ TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
+ fileWriter2.registerTimeseries(
+ new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]);
+ TSRecord record2 = new TSRecord(0, deviceIds[0]);
+ record2.addTuple(DataPoint
+ .getDataPoint(measurementSchemas[1].getType(), measurementSchemas[1].getMeasurementId(),
+ String.valueOf(0)));
+ fileWriter2.write(record2);
+ fileWriter2.close();
+ ret.add(tsFileResource1);
+ ret.add(tsFileResource2);
+ return ret;
+ }
+
}
\ No newline at end of file