You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/10 09:10:25 UTC

[iotdb] 01/01: fix compaction ttl bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_compaction_ttl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 21f566b07660c58ead6162f4e6dcc3ba92a5bd86
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 10 17:09:45 2021 +0800

    fix compaction ttl bug
---
 .../level/LevelCompactionTsFileManagement.java     |   7 +
 .../engine/compaction/utils/CompactionUtils.java   | 284 +++++++++++----------
 2 files changed, 151 insertions(+), 140 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 98f1c2e..afefcd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -660,6 +660,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
             // log source file list and target file for recover
             for (TsFileResource mergeResource : mergeResources.get(i)) {
+              mergeResource.setMerging(true);
               compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
             }
             File newLevelFile =
@@ -844,6 +845,12 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
         logAnalyzer.analyze();
         String targetFilePath = logAnalyzer.getTargetFile();
+        List<String> sourceFileList = logAnalyzer.getSourceFiles();
+        boolean isSeq = logAnalyzer.isSeq();
+        for (String file : sourceFileList) {
+          TsFileResource fileResource = getTsFileResource(file, isSeq);
+          fileResource.setMerging(false);
+        }
         if (targetFilePath != null) {
           File targetFile = new File(targetFilePath);
           if (targetFile.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index a6aa68c..22705af 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -260,142 +260,92 @@ public class CompactionUtils {
       boolean sequence,
       List<Modification> modifications)
       throws IOException, IllegalPathException {
-    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
-    Map<String, List<Modification>> modificationCache = new HashMap<>();
-    RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
-    Set<String> tsFileDevicesMap =
-        getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
-    for (String device : tsFileDevicesMap) {
-      if (devices.contains(device)) {
-        continue;
-      }
-      writer.startChunkGroup(device);
-      Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
-          new TreeMap<>(
-              (o1, o2) ->
-                  TsFileManagement.compareFileName(
-                      new File(o1.getFileName()), new File(o2.getFileName())));
-      Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
-          chunkMetadataListIteratorCache =
-              new TreeMap<>(
-                  (o1, o2) ->
-                      TsFileManagement.compareFileName(
-                          new File(o1.getFileName()), new File(o2.getFileName())));
-      for (TsFileResource tsFileResource : tsFileResources) {
-        TsFileSequenceReader reader =
-            buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
-        if (reader == null) {
-          throw new IOException();
+    try {
+      RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+      Map<String, List<Modification>> modificationCache = new HashMap<>();
+      RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE()
+          .getMergeWriteRateLimiter();
+      Set<String> tsFileDevicesMap =
+          getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
+      for (String device : tsFileDevicesMap) {
+        if (devices.contains(device)) {
+          continue;
         }
-        Iterator<Map<String, List<ChunkMetadata>>> iterator =
-            reader.getMeasurementChunkMetadataListMapIterator(device);
-        chunkMetadataListIteratorCache.put(reader, iterator);
-        chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
-      }
-      while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
-        String lastSensor = null;
-        Set<String> allSensors = new HashSet<>();
-        for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
-            chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
-          TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
-          Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
-              chunkMetadataListCacheForMergeEntry.getValue();
-          if (sensorChunkMetadataListMap.size() <= 0) {
-            if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
-              sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
-              chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
-            } else {
-              continue;
-            }
-          }
-          // get the min last sensor in the current chunkMetadata cache list for merge
-          String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
-          if (lastSensor == null) {
-            lastSensor = maxSensor;
-          } else {
-            if (maxSensor.compareTo(lastSensor) < 0) {
-              lastSensor = maxSensor;
-            }
+        writer.startChunkGroup(device);
+        Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
+            new TreeMap<>(
+                (o1, o2) ->
+                    TsFileManagement.compareFileName(
+                        new File(o1.getFileName()), new File(o2.getFileName())));
+        Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
+            chunkMetadataListIteratorCache =
+            new TreeMap<>(
+                (o1, o2) ->
+                    TsFileManagement.compareFileName(
+                        new File(o1.getFileName()), new File(o2.getFileName())));
+        for (TsFileResource tsFileResource : tsFileResources) {
+          TsFileSequenceReader reader =
+              buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
+          if (reader == null) {
+            throw new IOException();
           }
-          // get all sensor used later
-          allSensors.addAll(sensorChunkMetadataListMap.keySet());
+          Iterator<Map<String, List<ChunkMetadata>>> iterator =
+              reader.getMeasurementChunkMetadataListMapIterator(device);
+          chunkMetadataListIteratorCache.put(reader, iterator);
+          chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
         }
-
-        for (String sensor : allSensors) {
-          if (sensor.compareTo(lastSensor) <= 0) {
-            Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
-                new TreeMap<>(
-                    (o1, o2) ->
-                        TsFileManagement.compareFileName(
-                            new File(o1.getFileName()), new File(o2.getFileName())));
-            // find all chunkMetadata of a sensor
-            for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
-                chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
-              TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
-              Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
-                  chunkMetadataListCacheForMergeEntry.getValue();
-              if (sensorChunkMetadataListMap.containsKey(sensor)) {
-                readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
-                sensorChunkMetadataListMap.remove(sensor);
+        while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+          String lastSensor = null;
+          Set<String> allSensors = new HashSet<>();
+          for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+              chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+            TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+            Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+                chunkMetadataListCacheForMergeEntry.getValue();
+            if (sensorChunkMetadataListMap.size() <= 0) {
+              if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
+                sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
+                chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
+              } else {
+                continue;
               }
             }
-            Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
-                sensorReaderChunkMetadataListEntry =
-                    new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
-            if (!sequence) {
-              writeByDeserializePageMerge(
-                  device,
-                  compactionWriteRateLimiter,
-                  sensorReaderChunkMetadataListEntry,
-                  targetResource,
-                  writer,
-                  modificationCache,
-                  modifications);
+            // get the min last sensor in the current chunkMetadata cache list for merge
+            String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
+            if (lastSensor == null) {
+              lastSensor = maxSensor;
             } else {
-              boolean isChunkEnoughLarge = true;
-              boolean isPageEnoughLarge = true;
-              for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
-                for (ChunkMetadata chunkMetadata : chunkMetadatas) {
-                  if (chunkMetadata.getNumOfPoints()
-                      < IoTDBDescriptor.getInstance()
-                          .getConfig()
-                          .getMergePagePointNumberThreshold()) {
-                    isPageEnoughLarge = false;
-                  }
-                  if (chunkMetadata.getNumOfPoints()
-                      < IoTDBDescriptor.getInstance()
-                          .getConfig()
-                          .getMergeChunkPointNumberThreshold()) {
-                    isChunkEnoughLarge = false;
-                  }
+              if (maxSensor.compareTo(lastSensor) < 0) {
+                lastSensor = maxSensor;
+              }
+            }
+            // get all sensor used later
+            allSensors.addAll(sensorChunkMetadataListMap.keySet());
+          }
+
+          for (String sensor : allSensors) {
+            if (sensor.compareTo(lastSensor) <= 0) {
+              Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
+                  new TreeMap<>(
+                      (o1, o2) ->
+                          TsFileManagement.compareFileName(
+                              new File(o1.getFileName()), new File(o2.getFileName())));
+              // find all chunkMetadata of a sensor
+              for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+                  chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+                TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+                Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+                    chunkMetadataListCacheForMergeEntry.getValue();
+                if (sensorChunkMetadataListMap.containsKey(sensor)) {
+                  readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
+                  sensorChunkMetadataListMap.remove(sensor);
                 }
               }
-              // if a chunk is large enough, it's page must be large enough too
-              if (isChunkEnoughLarge) {
-                logger.debug(
-                    "{} [Compaction] chunk enough large, use append chunk merge", storageGroup);
-                // append page in chunks, so we do not have to deserialize a chunk
-                writeByAppendChunkMerge(
-                    device,
-                    compactionWriteRateLimiter,
-                    sensorReaderChunkMetadataListEntry,
-                    targetResource,
-                    writer);
-              } else if (isPageEnoughLarge) {
-                logger.debug(
-                    "{} [Compaction] page enough large, use append page merge", storageGroup);
-                // append page in chunks, so we do not have to deserialize a chunk
-                writeByAppendPageMerge(
-                    device,
-                    compactionWriteRateLimiter,
-                    sensorReaderChunkMetadataListEntry,
-                    targetResource,
-                    writer);
-              } else {
-                logger.debug(
-                    "{} [Compaction] page too small, use deserialize page merge", storageGroup);
-                // we have to deserialize chunks to merge pages
+              Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
+                  sensorReaderChunkMetadataListEntry =
+                  new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
+              if (!sequence) {
                 writeByDeserializePageMerge(
                     device,
                     compactionWriteRateLimiter,
@@ -404,27 +354,81 @@ public class CompactionUtils {
                     writer,
                     modificationCache,
                     modifications);
+              } else {
+                boolean isChunkEnoughLarge = true;
+                boolean isPageEnoughLarge = true;
+                for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
+                  for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+                    if (chunkMetadata.getNumOfPoints()
+                        < IoTDBDescriptor.getInstance()
+                        .getConfig()
+                        .getMergePagePointNumberThreshold()) {
+                      isPageEnoughLarge = false;
+                    }
+                    if (chunkMetadata.getNumOfPoints()
+                        < IoTDBDescriptor.getInstance()
+                        .getConfig()
+                        .getMergeChunkPointNumberThreshold()) {
+                      isChunkEnoughLarge = false;
+                    }
+                  }
+                }
+                // if a chunk is large enough, it's page must be large enough too
+                if (isChunkEnoughLarge) {
+                  logger.debug(
+                      "{} [Compaction] chunk enough large, use append chunk merge", storageGroup);
+                  // append page in chunks, so we do not have to deserialize a chunk
+                  writeByAppendChunkMerge(
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer);
+                } else if (isPageEnoughLarge) {
+                  logger.debug(
+                      "{} [Compaction] page enough large, use append page merge", storageGroup);
+                  // append page in chunks, so we do not have to deserialize a chunk
+                  writeByAppendPageMerge(
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer);
+                } else {
+                  logger.debug(
+                      "{} [Compaction] page too small, use deserialize page merge", storageGroup);
+                  // we have to deserialize chunks to merge pages
+                  writeByDeserializePageMerge(
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer,
+                      modificationCache,
+                      modifications);
+                }
               }
             }
           }
         }
+        writer.endChunkGroup();
+        if (compactionLogger != null) {
+          compactionLogger.logDevice(device, writer.getPos());
+        }
       }
-      writer.endChunkGroup();
-      if (compactionLogger != null) {
-        compactionLogger.logDevice(device, writer.getPos());
-      }
-    }
 
-    for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
-      reader.close();
-    }
+      for (TsFileResource tsFileResource : tsFileResources) {
+        targetResource.updatePlanIndexes(tsFileResource);
+      }
+      targetResource.serialize();
+      writer.endFile();
+      targetResource.close();
 
-    for (TsFileResource tsFileResource : tsFileResources) {
-      targetResource.updatePlanIndexes(tsFileResource);
+    } finally {
+      for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
+        reader.close();
+      }
     }
-    targetResource.serialize();
-    writer.endFile();
-    targetResource.close();
   }
 
   private static TsFileSequenceReader buildReaderFromTsFileResource(