You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/11 09:44:34 UTC

[iotdb] branch rel/0.11 updated: [IOTDB-1432]fix level compaction loss data (#3388)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 30c9f12   [IOTDB-1432]fix level compaction loss data (#3388)
30c9f12 is described below

commit 30c9f1228d1fb228092f4f849b9bc53e38fde336
Author: gwmh <16...@qq.com>
AuthorDate: Fri Jun 11 17:44:04 2021 +0800

     [IOTDB-1432]fix level compaction loss data (#3388)
---
 .../engine/compaction/utils/CompactionUtils.java   |  5 ++-
 .../compaction/LevelCompactionMergeTest.java       | 50 ++++++++++++++++++++++
 .../db/engine/compaction/LevelCompactionTest.java  | 43 +++++++++++++++++++
 3 files changed, 97 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 822b149..e5ea202 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -318,7 +318,10 @@ public class CompactionUtils {
             // get all sensor used later
             allSensors.addAll(sensorChunkMetadataListMap.keySet());
           }
-
+          // if there is no more chunkMetaData, merge all the sensors
+          if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+            lastSensor = Collections.max(allSensors);
+          }
           for (String sensor : allSensors) {
             if (sensor.compareTo(lastSensor) <= 0) {
               Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index f687957..6656993 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -66,6 +66,56 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
     FileUtils.deleteDirectory(tempSGDir);
   }
 
+  @Test
+  public void testCompactionDiffTimeSeries()
+      throws IOException, WriteProcessException, IllegalPathException {
+    int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
+    int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
+    List<TsFileResource> compactionFiles = prepareTsFileResources();
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement = new LevelCompactionTsFileManagement(
+        COMPACTION_TEST_SG, tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(compactionFiles, true);
+    QueryContext context = new QueryContext();
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[1].getMeasurementId());
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(),
+        context,
+        levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true);
+    int count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionOnePartitionUtil compactionOnePartitionUtil = levelCompactionTsFileManagement.new CompactionOnePartitionUtil(
+        this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionOnePartitionUtil.run();
+    while (compactionMergeWorking) {
+      //wait
+    }
+    context = new QueryContext();
+    tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(),
+        context,
+        levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true);
+    count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
+  }
+
   /**
    * just compaction once
    */
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index dc71ea4..dfa69d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -204,4 +204,47 @@ abstract class LevelCompactionTest {
     fileWriter.close();
   }
 
+  List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException {
+    List<TsFileResource> ret = new ArrayList<>();
+    // prepare file 1
+    File file1 = new File(
+        TestConstant.BASE_OUTPUT_PATH.concat(
+            System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+                + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+                + ".tsfile"));
+    TsFileResource tsFileResource1 = new TsFileResource(file1);
+    tsFileResource1.setClosed(true);
+    tsFileResource1.updatePlanIndexes((long) 0);
+    TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
+    fileWriter1.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]);
+    TSRecord record1 = new TSRecord(0, deviceIds[0]);
+    record1.addTuple(DataPoint
+        .getDataPoint(measurementSchemas[0].getType(), measurementSchemas[0].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter1.write(record1);
+    fileWriter1.close();
+    // prepare file 2
+    File file2 = new File(
+        TestConstant.BASE_OUTPUT_PATH.concat(
+            System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 1
+                + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+                + ".tsfile"));
+    TsFileResource tsFileResource2 = new TsFileResource(file2);
+    tsFileResource2.setClosed(true);
+    tsFileResource2.updatePlanIndexes((long) 1);
+    TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
+    fileWriter2.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]);
+    TSRecord record2 = new TSRecord(0, deviceIds[0]);
+    record2.addTuple(DataPoint
+        .getDataPoint(measurementSchemas[1].getType(), measurementSchemas[1].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter2.write(record2);
+    fileWriter2.close();
+    ret.add(tsFileResource1);
+    ret.add(tsFileResource2);
+    return ret;
+  }
+
 }
\ No newline at end of file