You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/10 09:10:24 UTC

[iotdb] branch fix_compaction_ttl created (now 21f566b)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_compaction_ttl
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 21f566b  fix compaction ttl bug

This branch includes the following new commits:

     new 21f566b  fix compaction ttl bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix compaction ttl bug

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_compaction_ttl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 21f566b07660c58ead6162f4e6dcc3ba92a5bd86
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 10 17:09:45 2021 +0800

    fix compaction ttl bug
---
 .../level/LevelCompactionTsFileManagement.java     |   7 +
 .../engine/compaction/utils/CompactionUtils.java   | 284 +++++++++++----------
 2 files changed, 151 insertions(+), 140 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 98f1c2e..afefcd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -660,6 +660,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
             // log source file list and target file for recover
             for (TsFileResource mergeResource : mergeResources.get(i)) {
+              mergeResource.setMerging(true);
               compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
             }
             File newLevelFile =
@@ -844,6 +845,12 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
         logAnalyzer.analyze();
         String targetFilePath = logAnalyzer.getTargetFile();
+        List<String> sourceFileList = logAnalyzer.getSourceFiles();
+        boolean isSeq = logAnalyzer.isSeq();
+        for (String file : sourceFileList) {
+          TsFileResource fileResource = getTsFileResource(file, isSeq);
+          fileResource.setMerging(false);
+        }
         if (targetFilePath != null) {
           File targetFile = new File(targetFilePath);
           if (targetFile.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index a6aa68c..22705af 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -260,142 +260,92 @@ public class CompactionUtils {
       boolean sequence,
       List<Modification> modifications)
       throws IOException, IllegalPathException {
-    RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
-    Map<String, List<Modification>> modificationCache = new HashMap<>();
-    RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
-    Set<String> tsFileDevicesMap =
-        getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
-    for (String device : tsFileDevicesMap) {
-      if (devices.contains(device)) {
-        continue;
-      }
-      writer.startChunkGroup(device);
-      Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
-          new TreeMap<>(
-              (o1, o2) ->
-                  TsFileManagement.compareFileName(
-                      new File(o1.getFileName()), new File(o2.getFileName())));
-      Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
-          chunkMetadataListIteratorCache =
-              new TreeMap<>(
-                  (o1, o2) ->
-                      TsFileManagement.compareFileName(
-                          new File(o1.getFileName()), new File(o2.getFileName())));
-      for (TsFileResource tsFileResource : tsFileResources) {
-        TsFileSequenceReader reader =
-            buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
-        if (reader == null) {
-          throw new IOException();
+    try {
+      RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+      Map<String, List<Modification>> modificationCache = new HashMap<>();
+      RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE()
+          .getMergeWriteRateLimiter();
+      Set<String> tsFileDevicesMap =
+          getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
+      for (String device : tsFileDevicesMap) {
+        if (devices.contains(device)) {
+          continue;
         }
-        Iterator<Map<String, List<ChunkMetadata>>> iterator =
-            reader.getMeasurementChunkMetadataListMapIterator(device);
-        chunkMetadataListIteratorCache.put(reader, iterator);
-        chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
-      }
-      while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
-        String lastSensor = null;
-        Set<String> allSensors = new HashSet<>();
-        for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
-            chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
-          TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
-          Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
-              chunkMetadataListCacheForMergeEntry.getValue();
-          if (sensorChunkMetadataListMap.size() <= 0) {
-            if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
-              sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
-              chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
-            } else {
-              continue;
-            }
-          }
-          // get the min last sensor in the current chunkMetadata cache list for merge
-          String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
-          if (lastSensor == null) {
-            lastSensor = maxSensor;
-          } else {
-            if (maxSensor.compareTo(lastSensor) < 0) {
-              lastSensor = maxSensor;
-            }
+        writer.startChunkGroup(device);
+        Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
+            new TreeMap<>(
+                (o1, o2) ->
+                    TsFileManagement.compareFileName(
+                        new File(o1.getFileName()), new File(o2.getFileName())));
+        Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
+            chunkMetadataListIteratorCache =
+            new TreeMap<>(
+                (o1, o2) ->
+                    TsFileManagement.compareFileName(
+                        new File(o1.getFileName()), new File(o2.getFileName())));
+        for (TsFileResource tsFileResource : tsFileResources) {
+          TsFileSequenceReader reader =
+              buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
+          if (reader == null) {
+            throw new IOException();
           }
-          // get all sensor used later
-          allSensors.addAll(sensorChunkMetadataListMap.keySet());
+          Iterator<Map<String, List<ChunkMetadata>>> iterator =
+              reader.getMeasurementChunkMetadataListMapIterator(device);
+          chunkMetadataListIteratorCache.put(reader, iterator);
+          chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
         }
-
-        for (String sensor : allSensors) {
-          if (sensor.compareTo(lastSensor) <= 0) {
-            Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
-                new TreeMap<>(
-                    (o1, o2) ->
-                        TsFileManagement.compareFileName(
-                            new File(o1.getFileName()), new File(o2.getFileName())));
-            // find all chunkMetadata of a sensor
-            for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
-                chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
-              TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
-              Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
-                  chunkMetadataListCacheForMergeEntry.getValue();
-              if (sensorChunkMetadataListMap.containsKey(sensor)) {
-                readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
-                sensorChunkMetadataListMap.remove(sensor);
+        while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+          String lastSensor = null;
+          Set<String> allSensors = new HashSet<>();
+          for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+              chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+            TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+            Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+                chunkMetadataListCacheForMergeEntry.getValue();
+            if (sensorChunkMetadataListMap.size() <= 0) {
+              if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
+                sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
+                chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
+              } else {
+                continue;
               }
             }
-            Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
-                sensorReaderChunkMetadataListEntry =
-                    new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
-            if (!sequence) {
-              writeByDeserializePageMerge(
-                  device,
-                  compactionWriteRateLimiter,
-                  sensorReaderChunkMetadataListEntry,
-                  targetResource,
-                  writer,
-                  modificationCache,
-                  modifications);
+            // get the min last sensor in the current chunkMetadata cache list for merge
+            String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
+            if (lastSensor == null) {
+              lastSensor = maxSensor;
             } else {
-              boolean isChunkEnoughLarge = true;
-              boolean isPageEnoughLarge = true;
-              for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
-                for (ChunkMetadata chunkMetadata : chunkMetadatas) {
-                  if (chunkMetadata.getNumOfPoints()
-                      < IoTDBDescriptor.getInstance()
-                          .getConfig()
-                          .getMergePagePointNumberThreshold()) {
-                    isPageEnoughLarge = false;
-                  }
-                  if (chunkMetadata.getNumOfPoints()
-                      < IoTDBDescriptor.getInstance()
-                          .getConfig()
-                          .getMergeChunkPointNumberThreshold()) {
-                    isChunkEnoughLarge = false;
-                  }
+              if (maxSensor.compareTo(lastSensor) < 0) {
+                lastSensor = maxSensor;
+              }
+            }
+            // get all sensor used later
+            allSensors.addAll(sensorChunkMetadataListMap.keySet());
+          }
+
+          for (String sensor : allSensors) {
+            if (sensor.compareTo(lastSensor) <= 0) {
+              Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
+                  new TreeMap<>(
+                      (o1, o2) ->
+                          TsFileManagement.compareFileName(
+                              new File(o1.getFileName()), new File(o2.getFileName())));
+              // find all chunkMetadata of a sensor
+              for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+                  chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+                TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+                Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+                    chunkMetadataListCacheForMergeEntry.getValue();
+                if (sensorChunkMetadataListMap.containsKey(sensor)) {
+                  readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
+                  sensorChunkMetadataListMap.remove(sensor);
                 }
               }
-              // if a chunk is large enough, it's page must be large enough too
-              if (isChunkEnoughLarge) {
-                logger.debug(
-                    "{} [Compaction] chunk enough large, use append chunk merge", storageGroup);
-                // append page in chunks, so we do not have to deserialize a chunk
-                writeByAppendChunkMerge(
-                    device,
-                    compactionWriteRateLimiter,
-                    sensorReaderChunkMetadataListEntry,
-                    targetResource,
-                    writer);
-              } else if (isPageEnoughLarge) {
-                logger.debug(
-                    "{} [Compaction] page enough large, use append page merge", storageGroup);
-                // append page in chunks, so we do not have to deserialize a chunk
-                writeByAppendPageMerge(
-                    device,
-                    compactionWriteRateLimiter,
-                    sensorReaderChunkMetadataListEntry,
-                    targetResource,
-                    writer);
-              } else {
-                logger.debug(
-                    "{} [Compaction] page too small, use deserialize page merge", storageGroup);
-                // we have to deserialize chunks to merge pages
+              Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
+                  sensorReaderChunkMetadataListEntry =
+                  new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
+              if (!sequence) {
                 writeByDeserializePageMerge(
                     device,
                     compactionWriteRateLimiter,
@@ -404,27 +354,81 @@ public class CompactionUtils {
                     writer,
                     modificationCache,
                     modifications);
+              } else {
+                boolean isChunkEnoughLarge = true;
+                boolean isPageEnoughLarge = true;
+                for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
+                  for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+                    if (chunkMetadata.getNumOfPoints()
+                        < IoTDBDescriptor.getInstance()
+                        .getConfig()
+                        .getMergePagePointNumberThreshold()) {
+                      isPageEnoughLarge = false;
+                    }
+                    if (chunkMetadata.getNumOfPoints()
+                        < IoTDBDescriptor.getInstance()
+                        .getConfig()
+                        .getMergeChunkPointNumberThreshold()) {
+                      isChunkEnoughLarge = false;
+                    }
+                  }
+                }
+                // if a chunk is large enough, it's page must be large enough too
+                if (isChunkEnoughLarge) {
+                  logger.debug(
+                      "{} [Compaction] chunk enough large, use append chunk merge", storageGroup);
+                  // append page in chunks, so we do not have to deserialize a chunk
+                  writeByAppendChunkMerge(
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer);
+                } else if (isPageEnoughLarge) {
+                  logger.debug(
+                      "{} [Compaction] page enough large, use append page merge", storageGroup);
+                  // append page in chunks, so we do not have to deserialize a chunk
+                  writeByAppendPageMerge(
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer);
+                } else {
+                  logger.debug(
+                      "{} [Compaction] page too small, use deserialize page merge", storageGroup);
+                  // we have to deserialize chunks to merge pages
+                  writeByDeserializePageMerge(
+                      device,
+                      compactionWriteRateLimiter,
+                      sensorReaderChunkMetadataListEntry,
+                      targetResource,
+                      writer,
+                      modificationCache,
+                      modifications);
+                }
               }
             }
           }
         }
+        writer.endChunkGroup();
+        if (compactionLogger != null) {
+          compactionLogger.logDevice(device, writer.getPos());
+        }
       }
-      writer.endChunkGroup();
-      if (compactionLogger != null) {
-        compactionLogger.logDevice(device, writer.getPos());
-      }
-    }
 
-    for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
-      reader.close();
-    }
+      for (TsFileResource tsFileResource : tsFileResources) {
+        targetResource.updatePlanIndexes(tsFileResource);
+      }
+      targetResource.serialize();
+      writer.endFile();
+      targetResource.close();
 
-    for (TsFileResource tsFileResource : tsFileResources) {
-      targetResource.updatePlanIndexes(tsFileResource);
+    } finally {
+      for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
+        reader.close();
+      }
     }
-    targetResource.serialize();
-    writer.endFile();
-    targetResource.close();
   }
 
   private static TsFileSequenceReader buildReaderFromTsFileResource(