You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/11 10:11:44 UTC

[iotdb] branch fix_level_compaction_loss_data created (now 64d676d)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_level_compaction_loss_data
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 64d676d  fix level compaction loss data

This branch includes the following new commits:

     new 64d676d  fix level compaction loss data

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix level compaction loss data

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_level_compaction_loss_data
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64d676df004f5e67b70f1aac5108ca9b1c73b067
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 11 18:10:42 2021 +0800

    fix level compaction loss data
---
 .../engine/compaction/utils/CompactionUtils.java   |  5 ++
 .../compaction/LevelCompactionMergeTest.java       | 68 ++++++++++++++++++++++
 .../db/engine/compaction/LevelCompactionTest.java  | 59 +++++++++++++++++++
 3 files changed, 132 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index aaeaf81..4fafa9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -324,6 +324,11 @@ public class CompactionUtils {
             allSensors.addAll(sensorChunkMetadataListMap.keySet());
           }
 
+          // if there is no more chunkMetaData, merge all the sensors
+          if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+            lastSensor = Collections.max(allSensors);
+          }
+
           for (String sensor : allSensors) {
             if (sensor.compareTo(lastSensor) <= 0) {
               Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 579fd88..a71ef40 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -304,4 +304,72 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
       boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
+
+  @Test
+  public void testCompactionDiffTimeSeries()
+      throws IOException, WriteProcessException, IllegalPathException {
+    int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
+    int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
+    List<TsFileResource> compactionFiles = prepareTsFileResources();
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(compactionFiles, true);
+    QueryContext context = new QueryContext();
+    PartialPath path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[1].getMeasurementId());
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[1].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    int count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionOnePartitionUtil =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionOnePartitionUtil.call();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    context = new QueryContext();
+    tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[1].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index 94d496d..55c56b9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -231,4 +231,63 @@ abstract class LevelCompactionTest {
     }
     fileWriter.close();
   }
+
+  List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException {
+    List<TsFileResource> ret = new ArrayList<>();
+    // prepare file 1
+    File file1 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                System.nanoTime()
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource1 = new TsFileResource(file1);
+    tsFileResource1.setClosed(true);
+    tsFileResource1.updatePlanIndexes((long) 0);
+    TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
+    fileWriter1.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]);
+    TSRecord record1 = new TSRecord(0, deviceIds[0]);
+    record1.addTuple(
+        DataPoint.getDataPoint(
+            measurementSchemas[0].getType(),
+            measurementSchemas[0].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter1.write(record1);
+    fileWriter1.close();
+    // prepare file 2
+    File file2 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                System.nanoTime()
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource2 = new TsFileResource(file2);
+    tsFileResource2.setClosed(true);
+    tsFileResource2.updatePlanIndexes((long) 1);
+    TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
+    fileWriter2.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]);
+    TSRecord record2 = new TSRecord(0, deviceIds[0]);
+    record2.addTuple(
+        DataPoint.getDataPoint(
+            measurementSchemas[1].getType(),
+            measurementSchemas[1].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter2.write(record2);
+    fileWriter2.close();
+    ret.add(tsFileResource1);
+    ret.add(tsFileResource2);
+    return ret;
+  }
 }