You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/11 10:15:49 UTC

[iotdb] branch fix_level_compaction_loss_data_0.13 created (now e9631fb)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_level_compaction_loss_data_0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at e9631fb  fix level compaction loss data

This branch includes the following new commits:

     new e9631fb  fix level compaction loss data

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix level compaction loss data

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_level_compaction_loss_data_0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e9631fbb626d546185bf972a9790ccd6233ff9f2
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 11 18:15:18 2021 +0800

    fix level compaction loss data
---
 .../engine/compaction/utils/CompactionUtils.java   |  5 ++
 .../compaction/LevelCompactionMergeTest.java       | 68 ++++++++++++++++++++++
 .../db/engine/compaction/LevelCompactionTest.java  | 59 +++++++++++++++++++
 3 files changed, 132 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 58e92ed..95d9f5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -324,6 +324,11 @@ public class CompactionUtils {
             allSensors.addAll(sensorChunkMetadataListMap.keySet());
           }
 
+          // if there is no more chunkMetaData, merge all the sensors
+          if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+            lastSensor = Collections.max(allSensors);
+          }
+
           for (String sensor : allSensors) {
             if (sensor.compareTo(lastSensor) <= 0) {
               Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 579fd88..a71ef40 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -304,4 +304,72 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
       boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
+
+  @Test
+  public void testCompactionDiffTimeSeries()
+      throws IOException, WriteProcessException, IllegalPathException {
+    int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
+    int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
+    List<TsFileResource> compactionFiles = prepareTsFileResources();
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(compactionFiles, true);
+    QueryContext context = new QueryContext();
+    PartialPath path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[1].getMeasurementId());
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[1].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    int count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionOnePartitionUtil =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionOnePartitionUtil.call();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    context = new QueryContext();
+    tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[1].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index 94d496d..55c56b9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -231,4 +231,63 @@ abstract class LevelCompactionTest {
     }
     fileWriter.close();
   }
+
+  List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException {
+    List<TsFileResource> ret = new ArrayList<>();
+    // prepare file 1
+    File file1 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                System.nanoTime()
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource1 = new TsFileResource(file1);
+    tsFileResource1.setClosed(true);
+    tsFileResource1.updatePlanIndexes((long) 0);
+    TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
+    fileWriter1.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]);
+    TSRecord record1 = new TSRecord(0, deviceIds[0]);
+    record1.addTuple(
+        DataPoint.getDataPoint(
+            measurementSchemas[0].getType(),
+            measurementSchemas[0].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter1.write(record1);
+    fileWriter1.close();
+    // prepare file 2
+    File file2 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                System.nanoTime()
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource2 = new TsFileResource(file2);
+    tsFileResource2.setClosed(true);
+    tsFileResource2.updatePlanIndexes((long) 1);
+    TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
+    fileWriter2.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]);
+    TSRecord record2 = new TSRecord(0, deviceIds[0]);
+    record2.addTuple(
+        DataPoint.getDataPoint(
+            measurementSchemas[1].getType(),
+            measurementSchemas[1].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter2.write(record2);
+    fileWriter2.close();
+    ret.add(tsFileResource1);
+    ret.add(tsFileResource2);
+    return ret;
+  }
 }