You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/06/12 01:43:12 UTC

[iotdb] branch master updated: [IOTDB-1432] fix level compaction loss data (#3396)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 03f534d  [IOTDB-1432] fix level compaction loss data (#3396)
03f534d is described below

commit 03f534d4a9516c37990982ea8072df9d4d43d88e
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Fri Jun 11 20:42:35 2021 -0500

    [IOTDB-1432] fix level compaction loss data (#3396)
---
 .../engine/compaction/utils/CompactionUtils.java   |  5 ++
 .../compaction/LevelCompactionMergeTest.java       | 68 ++++++++++++++++++++++
 .../db/engine/compaction/LevelCompactionTest.java  | 59 +++++++++++++++++++
 3 files changed, 132 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 58e92ed..95d9f5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -324,6 +324,11 @@ public class CompactionUtils {
             allSensors.addAll(sensorChunkMetadataListMap.keySet());
           }
 
+          // if there is no more chunkMetaData, merge all the sensors
+          if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+            lastSensor = Collections.max(allSensors);
+          }
+
           for (String sensor : allSensors) {
             if (sensor.compareTo(lastSensor) <= 0) {
               Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 579fd88..a71ef40 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -304,4 +304,72 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
       boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
+
+  @Test
+  public void testCompactionDiffTimeSeries()
+      throws IOException, WriteProcessException, IllegalPathException {
+    int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
+    int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
+    List<TsFileResource> compactionFiles = prepareTsFileResources();
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(compactionFiles, true);
+    QueryContext context = new QueryContext();
+    PartialPath path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[1].getMeasurementId());
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[1].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    int count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionOnePartitionUtil =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionOnePartitionUtil.call();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    context = new QueryContext();
+    tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[1].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index 94d496d..55c56b9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -231,4 +231,63 @@ abstract class LevelCompactionTest {
     }
     fileWriter.close();
   }
+
+  List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException {
+    List<TsFileResource> ret = new ArrayList<>();
+    // prepare file 1
+    File file1 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                System.nanoTime()
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource1 = new TsFileResource(file1);
+    tsFileResource1.setClosed(true);
+    tsFileResource1.updatePlanIndexes((long) 0);
+    TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
+    fileWriter1.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]);
+    TSRecord record1 = new TSRecord(0, deviceIds[0]);
+    record1.addTuple(
+        DataPoint.getDataPoint(
+            measurementSchemas[0].getType(),
+            measurementSchemas[0].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter1.write(record1);
+    fileWriter1.close();
+    // prepare file 2
+    File file2 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                System.nanoTime()
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource2 = new TsFileResource(file2);
+    tsFileResource2.setClosed(true);
+    tsFileResource2.updatePlanIndexes((long) 1);
+    TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
+    fileWriter2.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]);
+    TSRecord record2 = new TSRecord(0, deviceIds[0]);
+    record2.addTuple(
+        DataPoint.getDataPoint(
+            measurementSchemas[1].getType(),
+            measurementSchemas[1].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter2.write(record2);
+    fileWriter2.close();
+    ret.add(tsFileResource1);
+    ret.add(tsFileResource2);
+    return ret;
+  }
 }