You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/06/04 10:52:38 UTC

[iotdb] branch rel/0.11 updated: fix compaction ttl bug (#3341)

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 5d0d5ac  fix compaction ttl bug (#3341)
5d0d5ac is described below

commit 5d0d5acd9db9048a4ec095c7503615bda183d59a
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Jun 4 18:52:20 2021 +0800

    fix compaction ttl bug (#3341)
    
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../level/LevelCompactionTsFileManagement.java     |  90 ++++++--
 .../engine/compaction/utils/CompactionUtils.java   | 250 +++++++++++----------
 2 files changed, 196 insertions(+), 144 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index c3281e3..9f00b4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -124,7 +124,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     logger.debug("{} [compaction] merge starts to delete real file", storageGroupName);
     for (TsFileResource mergeTsFile : mergeTsFiles) {
       deleteLevelFile(mergeTsFile);
-      logger.info(
+      logger.debug(
           "{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath());
     }
   }
@@ -570,20 +570,22 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     if (enableUnseqCompaction
         && unseqLevelNum <= 1
         && forkedUnSequenceTsFileResources.get(0).size() > 0) {
-      isMergeExecutedInCurrentTask = isMergeExecutedInCurrentTask ||
-          merge(
-              isForceFullMerge,
-              getTsFileListByTimePartition(true, timePartition),
-              forkedUnSequenceTsFileResources.get(0),
-              Long.MAX_VALUE);
+      isMergeExecutedInCurrentTask =
+          isMergeExecutedInCurrentTask
+              || merge(
+                  isForceFullMerge,
+                  getTsFileListByTimePartition(true, timePartition),
+                  forkedUnSequenceTsFileResources.get(0),
+                  Long.MAX_VALUE);
     } else {
-      isMergeExecutedInCurrentTask = isMergeExecutedInCurrentTask ||
-          merge(
-              forkedUnSequenceTsFileResources,
-              false,
-              timePartition,
-              unseqLevelNum,
-              unseqFileNumInEachLevel);
+      isMergeExecutedInCurrentTask =
+          isMergeExecutedInCurrentTask
+              || merge(
+                  forkedUnSequenceTsFileResources,
+                  false,
+                  timePartition,
+                  unseqLevelNum,
+                  unseqFileNumInEachLevel);
     }
   }
 
@@ -608,6 +610,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     long startTimeMillis = System.currentTimeMillis();
     // whether execute merge chunk in the loop below
     boolean isMergeExecutedInCurrentTask = false;
+    CompactionLogger compactionLogger = null;
     try {
       logger.info("{} start to filter compaction condition", storageGroupName);
       for (int i = 0; i < currMaxLevel - 1; i++) {
@@ -620,16 +623,17 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             // do not merge current unseq file level to upper level and just merge all of them to
             // seq file
             isSeqMerging = false;
-            isMergeExecutedInCurrentTask = merge(
-                isForceFullMerge,
-                getTsFileListByTimePartition(true, timePartition),
-                mergeResources.get(i),
-                Long.MAX_VALUE);
+            isMergeExecutedInCurrentTask =
+                merge(
+                    isForceFullMerge,
+                    getTsFileListByTimePartition(true, timePartition),
+                    mergeResources.get(i),
+                    Long.MAX_VALUE);
           } else {
-            CompactionLogger compactionLogger =
-                new CompactionLogger(storageGroupDir, storageGroupName);
+            compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
             // log source file list and target file for recover
             for (TsFileResource mergeResource : mergeResources.get(i)) {
+              mergeResource.setMerging(true);
               compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
             }
             File newLevelFile =
@@ -691,6 +695,14 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         }
       }
     } catch (Exception e) {
+      if (compactionLogger != null) {
+        try {
+          compactionLogger.close();
+        } catch (IOException ioException) {
+          logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
+        }
+      }
+      restoreCompaction();
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
       isSeqMerging = false;
@@ -704,6 +716,42 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     return isMergeExecutedInCurrentTask;
   }
 
+  /** restore the files back to the status before the compaction task is submitted */
+  private void restoreCompaction() {
+    File logFile =
+        FSFactoryProducer.getFSFactory()
+            .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
+    try {
+      if (logFile.exists()) {
+        CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
+        logAnalyzer.analyze();
+        String targetFilePath = logAnalyzer.getTargetFile();
+        List<String> sourceFileList = logAnalyzer.getSourceFiles();
+        boolean isSeq = logAnalyzer.isSeq();
+        for (String file : sourceFileList) {
+          TsFileResource fileResource = getTsFileResource(file, isSeq);
+          fileResource.setMerging(false);
+        }
+        if (targetFilePath != null) {
+          File targetFile = new File(targetFilePath);
+          if (targetFile.exists()) {
+            targetFile.delete();
+          }
+        }
+      }
+    } catch (IOException e) {
+      logger.error("restore compaction failed", e);
+    } finally {
+      if (logFile.exists()) {
+        try {
+          Files.delete(logFile.toPath());
+        } catch (IOException e) {
+          logger.error("delete compaction log file error ", e);
+        }
+      }
+    }
+  }
+
   /** if level < maxLevel-1, the file need compaction else, the file can be merged later */
   private File createNewTsFileName(File sourceFile, int level) {
     String path = sourceFile.getPath();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 3ca13fe..4662569 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -254,125 +254,93 @@ public class CompactionUtils {
       boolean sequence,
       List<Modification> modifications)
       throws IOException, IllegalPathException {
-    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
     Map<String, List<Modification>> modificationCache = new HashMap<>();
-    RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
-    Set<String> tsFileDevicesMap =
-        getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
-    for (String device : tsFileDevicesMap) {
-      if (devices.contains(device)) {
-        continue;
-      }
-      long maxVersion = Long.MIN_VALUE;
-      writer.startChunkGroup(device);
-      Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
-          new TreeMap<>(
-              (o1, o2) ->
-                  TsFileManagement.compareFileName(
-                      new File(o1.getFileName()), new File(o2.getFileName())));
-      Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
-          chunkMetadataListIteratorCache =
-              new TreeMap<>(
-                  (o1, o2) ->
-                      TsFileManagement.compareFileName(
-                          new File(o1.getFileName()), new File(o2.getFileName())));
-      for (TsFileResource tsFileResource : tsFileResources) {
-        TsFileSequenceReader reader =
-            buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
-        if (reader == null) {
-          throw new IOException();
-        }
-        Iterator<Map<String, List<ChunkMetadata>>> iterator =
-            reader.getMeasurementChunkMetadataListMapIterator(device);
-        chunkMetadataListIteratorCache.put(reader, iterator);
-        chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
-      }
-      while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
-        String lastSensor = null;
-        Set<String> allSensors = new HashSet<>();
-        for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
-            chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
-          TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
-          Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
-              chunkMetadataListCacheForMergeEntry.getValue();
-          if (sensorChunkMetadataListMap.size() <= 0) {
-            if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
-              sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
-              chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
-            } else {
-              continue;
-            }
-          }
-          // get the min last sensor in the current chunkMetadata cache list for merge
-          String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
-          if (lastSensor == null) {
-            lastSensor = maxSensor;
-          } else {
-            if (maxSensor.compareTo(lastSensor) < 0) {
-              lastSensor = maxSensor;
-            }
-          }
-          // get all sensor used later
-          allSensors.addAll(sensorChunkMetadataListMap.keySet());
+    try {
+      RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+      RateLimiter compactionWriteRateLimiter =
+          MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+      Set<String> tsFileDevicesMap =
+          getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
+      for (String device : tsFileDevicesMap) {
+        if (devices.contains(device)) {
+          continue;
         }
-
-        for (String sensor : allSensors) {
-          if (sensor.compareTo(lastSensor) <= 0) {
-            Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
+        long maxVersion = Long.MIN_VALUE;
+        writer.startChunkGroup(device);
+        Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
+            new TreeMap<>(
+                (o1, o2) ->
+                    TsFileManagement.compareFileName(
+                        new File(o1.getFileName()), new File(o2.getFileName())));
+        Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
+            chunkMetadataListIteratorCache =
                 new TreeMap<>(
                     (o1, o2) ->
                         TsFileManagement.compareFileName(
                             new File(o1.getFileName()), new File(o2.getFileName())));
-            // find all chunkMetadata of a sensor
-            for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
-                chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
-              TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
-              Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
-                  chunkMetadataListCacheForMergeEntry.getValue();
-              if (sensorChunkMetadataListMap.containsKey(sensor)) {
-                readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
-                sensorChunkMetadataListMap.remove(sensor);
+        for (TsFileResource tsFileResource : tsFileResources) {
+          TsFileSequenceReader reader =
+              buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
+          if (reader == null) {
+            throw new IOException();
+          }
+          Iterator<Map<String, List<ChunkMetadata>>> iterator =
+              reader.getMeasurementChunkMetadataListMapIterator(device);
+          chunkMetadataListIteratorCache.put(reader, iterator);
+          chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
+        }
+        while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+          String lastSensor = null;
+          Set<String> allSensors = new HashSet<>();
+          for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+              chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+            TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+            Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+                chunkMetadataListCacheForMergeEntry.getValue();
+            if (sensorChunkMetadataListMap.size() <= 0) {
+              if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
+                sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
+                chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
+              } else {
+                continue;
               }
             }
-            Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
-                sensorReaderChunkMetadataListEntry =
-                    new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
-            if (!sequence) {
-              writeByDeserializeMerge(
-                  maxVersion,
-                  device,
-                  compactionWriteRateLimiter,
-                  sensorReaderChunkMetadataListEntry,
-                  targetResource,
-                  writer,
-                  modificationCache,
-                  modifications);
+            // get the min last sensor in the current chunkMetadata cache list for merge
+            String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
+            if (lastSensor == null) {
+              lastSensor = maxSensor;
             } else {
-              boolean isPageEnoughLarge = true;
-              for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
-                for (ChunkMetadata chunkMetadata : chunkMetadatas) {
-                  if (chunkMetadata.getNumOfPoints() < MERGE_PAGE_POINT_NUM) {
-                    isPageEnoughLarge = false;
-                    break;
-                  }
+              if (maxSensor.compareTo(lastSensor) < 0) {
+                lastSensor = maxSensor;
+              }
+            }
+            // get all sensor used later
+            allSensors.addAll(sensorChunkMetadataListMap.keySet());
+          }
+
+          for (String sensor : allSensors) {
+            if (sensor.compareTo(lastSensor) <= 0) {
+              Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
+                  new TreeMap<>(
+                      (o1, o2) ->
+                          TsFileManagement.compareFileName(
+                              new File(o1.getFileName()), new File(o2.getFileName())));
+              // find all chunkMetadata of a sensor
+              for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+                  chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+                TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+                Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+                    chunkMetadataListCacheForMergeEntry.getValue();
+                if (sensorChunkMetadataListMap.containsKey(sensor)) {
+                  readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
+                  sensorChunkMetadataListMap.remove(sensor);
                 }
               }
-              if (isPageEnoughLarge) {
-                logger.debug("{} [Compaction] page enough large, use append merge", storageGroup);
-                // append page in chunks, so we do not have to deserialize a chunk
-                writeByAppendMerge(
-                    maxVersion,
-                    device,
-                    compactionWriteRateLimiter,
-                    sensorReaderChunkMetadataListEntry,
-                    targetResource,
-                    writer,
-                    modificationCache,
-                    modifications);
-              } else {
-                logger.debug("{} [Compaction] page too small, use deserialize merge", storageGroup);
-                // we have to deserialize chunks to merge pages
+              Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
+                  sensorReaderChunkMetadataListEntry =
+                      new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
+              if (!sequence) {
                 writeByDeserializeMerge(
                     maxVersion,
                     device,
@@ -382,29 +350,65 @@ public class CompactionUtils {
                     writer,
                     modificationCache,
                     modifications);
+              } else {
+                boolean isPageEnoughLarge = true;
+                for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
+                  for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+                    if (chunkMetadata.getNumOfPoints() < MERGE_PAGE_POINT_NUM) {
+                      isPageEnoughLarge = false;
+                      break;
+                    }
+                  }
+                }
+                if (isPageEnoughLarge) {
+                  logger.debug("{} [Compaction] page enough large, use append merge", storageGroup);
+                  // append page in chunks, so we do not have to deserialize a chunk
+                  writeByAppendMerge(
+                      maxVersion,
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer,
+                      modificationCache,
+                      modifications);
+                } else {
+                  logger.debug(
+                      "{} [Compaction] page too small, use deserialize merge", storageGroup);
+                  // we have to deserialize chunks to merge pages
+                  writeByDeserializeMerge(
+                      maxVersion,
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer,
+                      modificationCache,
+                      modifications);
+                }
               }
             }
           }
         }
+        writer.endChunkGroup();
+        writer.writeVersion(maxVersion);
+        if (compactionLogger != null) {
+          compactionLogger.logDevice(device, writer.getPos());
+        }
       }
-      writer.endChunkGroup();
-      writer.writeVersion(maxVersion);
-      if (compactionLogger != null) {
-        compactionLogger.logDevice(device, writer.getPos());
+      Set<Long> historicalVersions = new HashSet<>();
+      for (TsFileResource tsFileResource : tsFileResources) {
+        historicalVersions.addAll(tsFileResource.getHistoricalVersions());
+      }
+      targetResource.setHistoricalVersions(historicalVersions);
+      targetResource.serialize();
+      writer.endFile();
+      targetResource.close();
+    } finally {
+      for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
+        reader.close();
       }
     }
-
-    for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
-      reader.close();
-    }
-    Set<Long> historicalVersions = new HashSet<>();
-    for (TsFileResource tsFileResource : tsFileResources) {
-      historicalVersions.addAll(tsFileResource.getHistoricalVersions());
-    }
-    targetResource.setHistoricalVersions(historicalVersions);
-    targetResource.serialize();
-    writer.endFile();
-    targetResource.close();
   }
 
   private static TsFileSequenceReader buildReaderFromTsFileResource(