You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/06/10 17:20:31 UTC

[iotdb] branch fix_compaction_loss_data created (now 31e4257)

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a change to branch fix_compaction_loss_data
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 31e4257  fix bug and add UT

This branch includes the following new commits:

     new 31e4257  fix bug and add UT

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix bug and add UT

Posted by ej...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch fix_compaction_loss_data
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 31e4257c93040856fc0fb034381db8e1320f594a
Author: EJTTianyu <16...@qq.com>
AuthorDate: Fri Jun 11 01:19:49 2021 +0800

    fix bug and add UT
---
 .../engine/compaction/utils/CompactionUtils.java   |  5 ++-
 .../compaction/LevelCompactionMergeTest.java       | 50 ++++++++++++++++++++++
 .../db/engine/compaction/LevelCompactionTest.java  | 43 +++++++++++++++++++
 3 files changed, 97 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 822b149..e5ea202 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -318,7 +318,10 @@ public class CompactionUtils {
             // get all sensor used later
             allSensors.addAll(sensorChunkMetadataListMap.keySet());
           }
-
+          // if there is no more chunkMetaData, merge all the sensors
+          if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+            lastSensor = Collections.max(allSensors);
+          }
           for (String sensor : allSensors) {
             if (sensor.compareTo(lastSensor) <= 0) {
               Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index f687957..6656993 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -66,6 +66,56 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
     FileUtils.deleteDirectory(tempSGDir);
   }
 
+  @Test
+  public void testCompactionDiffTimeSeries()
+      throws IOException, WriteProcessException, IllegalPathException {
+    int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
+    int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
+    List<TsFileResource> compactionFiles = prepareTsFileResources();
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement = new LevelCompactionTsFileManagement(
+        COMPACTION_TEST_SG, tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(compactionFiles, true);
+    QueryContext context = new QueryContext();
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[1].getMeasurementId());
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(),
+        context,
+        levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true);
+    int count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionOnePartitionUtil compactionOnePartitionUtil = levelCompactionTsFileManagement.new CompactionOnePartitionUtil(
+        this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionOnePartitionUtil.run();
+    while (compactionMergeWorking) {
+      //wait
+    }
+    context = new QueryContext();
+    tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(),
+        context,
+        levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true);
+    count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        count++;
+      }
+    }
+    assertEquals(count, 1);
+    IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
+    IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
+  }
+
   /**
    * just compaction once
    */
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index dc71ea4..dfa69d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -204,4 +204,47 @@ abstract class LevelCompactionTest {
     fileWriter.close();
   }
 
+  List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException {
+    List<TsFileResource> ret = new ArrayList<>();
+    // prepare file 1
+    File file1 = new File(
+        TestConstant.BASE_OUTPUT_PATH.concat(
+            System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+                + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+                + ".tsfile"));
+    TsFileResource tsFileResource1 = new TsFileResource(file1);
+    tsFileResource1.setClosed(true);
+    tsFileResource1.updatePlanIndexes((long) 0);
+    TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile());
+    fileWriter1.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]);
+    TSRecord record1 = new TSRecord(0, deviceIds[0]);
+    record1.addTuple(DataPoint
+        .getDataPoint(measurementSchemas[0].getType(), measurementSchemas[0].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter1.write(record1);
+    fileWriter1.close();
+    // prepare file 2
+    File file2 = new File(
+        TestConstant.BASE_OUTPUT_PATH.concat(
+            System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 1
+                + IoTDBConstant.FILE_NAME_SEPARATOR + 0
+                + ".tsfile"));
+    TsFileResource tsFileResource2 = new TsFileResource(file2);
+    tsFileResource2.setClosed(true);
+    tsFileResource2.updatePlanIndexes((long) 1);
+    TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile());
+    fileWriter2.registerTimeseries(
+        new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]);
+    TSRecord record2 = new TSRecord(0, deviceIds[0]);
+    record2.addTuple(DataPoint
+        .getDataPoint(measurementSchemas[1].getType(), measurementSchemas[1].getMeasurementId(),
+            String.valueOf(0)));
+    fileWriter2.write(record2);
+    fileWriter2.close();
+    ret.add(tsFileResource1);
+    ret.add(tsFileResource2);
+    return ret;
+  }
+
 }
\ No newline at end of file