You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/06/04 11:19:43 UTC

[iotdb] branch rel/0.12 updated: [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3338)

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 347c047  [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3338)
347c047 is described below

commit 347c0470e9180aa1dd6b2c17a75efb9c8bbe2613
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Jun 4 19:19:20 2021 +0800

    [IOTDB-1419][To rel/0.12] remove redundant clearCompactionStatus (#3338)
    
    remove redundant clearCompactionStatus
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../db/engine/compaction/TsFileManagement.java     |   5 +-
 .../level/LevelCompactionTsFileManagement.java     |  38 +++-----
 .../engine/compaction/utils/CompactionUtils.java   |   3 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 108 +++++++++++++--------
 .../db/engine/storagegroup/TsFileResource.java     |   5 +
 .../db/sync/receiver/load/FileLoaderTest.java      |   4 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      |   2 +-
 7 files changed, 91 insertions(+), 74 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 7a7a30c..fffbd3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -156,23 +156,20 @@ public abstract class TsFileManagement {
 
   protected abstract void merge(long timePartition);
 
-  public class CompactionMergeTask extends StorageGroupCompactionTask {
+  public class CompactionMergeTask {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
 
     public CompactionMergeTask(
         CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
-      super(storageGroupName);
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
       this.timePartitionId = timePartitionId;
     }
 
-    @Override
     public Void call() {
       merge(timePartitionId);
       closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
-      clearCompactionStatus();
       return null;
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 46a964f..4f4aa15 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -52,11 +52,9 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
 
 /** The TsFileManagement for LEVEL_COMPACTION, use level struct to manage TsFile list */
 public class LevelCompactionTsFileManagement extends TsFileManagement {
@@ -265,7 +263,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     writeLock();
     try {
       long timePartitionId = tsFileResource.getTimePartition();
-      int level = getMergeLevel(tsFileResource.getTsFile());
+      int level = TsFileResource.getMergeLevel(tsFileResource.getTsFile().getName());
       if (sequence) {
         if (level <= seqLevelNum - 1) {
           // current file has normal level
@@ -480,7 +478,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
             sourceTsFileResources.add(getTsFileResource(file, isSeq));
           }
-          int level = getMergeLevel(new File(sourceFileList.get(0)));
+          int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
           RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
           // if not complete compaction, resume merge
           if (writer.hasCrashed()) {
@@ -506,7 +504,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 throw new InterruptedException(
                     String.format("%s [Compaction] abort", storageGroupName));
               }
-              int targetLevel = getMergeLevel(targetResource.getTsFile());
+              int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
               if (isSeq) {
                 sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
                 sequenceRecoverTsFileResources.clear();
@@ -602,18 +600,20 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         && forkedUnSequenceTsFileResources.get(0).size() > 0) {
       isMergeExecutedInCurrentTask =
           merge(
-              isForceFullMerge,
-              getTsFileListByTimePartition(true, timePartition),
-              forkedUnSequenceTsFileResources.get(0),
-              Long.MAX_VALUE);
+                  isForceFullMerge,
+                  getTsFileListByTimePartition(true, timePartition),
+                  forkedUnSequenceTsFileResources.get(0),
+                  Long.MAX_VALUE)
+              || isMergeExecutedInCurrentTask;
     } else {
       isMergeExecutedInCurrentTask =
           merge(
-              forkedUnSequenceTsFileResources,
-              false,
-              timePartition,
-              unseqLevelNum,
-              unseqFileNumInEachLevel);
+                  forkedUnSequenceTsFileResources,
+                  false,
+                  timePartition,
+                  unseqLevelNum,
+                  unseqFileNumInEachLevel)
+              || isMergeExecutedInCurrentTask;
     }
   }
 
@@ -640,7 +640,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     boolean isMergeExecutedInCurrentTask = false;
     CompactionLogger compactionLogger = null;
     try {
-      logger.info("{} start to filter compaction condition", storageGroupName);
+      logger.debug("{} start to filter compaction condition", storageGroupName);
       for (int i = 0; i < currMaxLevel - 1; i++) {
         List<TsFileResource> currLevelTsFileResource = mergeResources.get(i);
         if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
@@ -779,14 +779,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     return newUnSequenceTsFileResources;
   }
 
-  public static int getMergeLevel(File file) {
-    String mergeLevelStr =
-        file.getPath()
-            .substring(file.getPath().lastIndexOf(FILE_NAME_SEPARATOR) + 1)
-            .replaceAll(TSFILE_SUFFIX, "");
-    return Integer.parseInt(mergeLevelStr);
-  }
-
   private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
       throws IOException {
     if (isSeq) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 2d3d6a7..4f81329 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -443,7 +443,8 @@ public class CompactionUtils {
               logger.info("{} tsfile does not exist", path);
               return null;
             }
-          } catch (IOException e) {
+          } catch (Exception e) {
+            logger.warn("{} tsfile may be destroyed", path);
             logger.error(
                 "Storage group {}, flush recover meets error. reader create failed.",
                 storageGroup,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 55c277d..8c2d95d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.engine.compaction.StorageGroupCompactionTask;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
-import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -443,6 +442,42 @@ public class StorageGroupProcessor {
         recoverTsFiles(value, false);
       }
 
+      for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
+        long partitionNum = resource.getTimePartition();
+        updatePartitionFileVersion(partitionNum, resource.getVersion());
+      }
+      for (TsFileResource resource : tsFileManagement.getTsFileList(false)) {
+        long partitionNum = resource.getTimePartition();
+        updatePartitionFileVersion(partitionNum, resource.getVersion());
+      }
+      for (TsFileResource resource : upgradeSeqFileList) {
+        long partitionNum = resource.getTimePartition();
+        updatePartitionFileVersion(partitionNum, resource.getVersion());
+      }
+      for (TsFileResource resource : upgradeUnseqFileList) {
+        long partitionNum = resource.getTimePartition();
+        updatePartitionFileVersion(partitionNum, resource.getVersion());
+      }
+      updateLatestFlushedTime();
+
+      List<TsFileResource> seqTsFileResources = tsFileManagement.getTsFileList(true);
+      for (TsFileResource resource : seqTsFileResources) {
+        long timePartitionId = resource.getTimePartition();
+        Map<String, Long> endTimeMap = new HashMap<>();
+        for (String deviceId : resource.getDevices()) {
+          long endTime = resource.getEndTime(deviceId);
+          endTimeMap.put(deviceId, endTime);
+        }
+        latestTimeForEachDevice
+            .computeIfAbsent(timePartitionId, l -> new HashMap<>())
+            .putAll(endTimeMap);
+        partitionLatestFlushedTimeForEachDevice
+            .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+            .putAll(endTimeMap);
+        globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
+      }
+
+      // leave it in the end
       String taskName =
           logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis();
       File mergingMods =
@@ -450,6 +485,7 @@ public class StorageGroupProcessor {
       if (mergingMods.exists()) {
         this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
       }
+
       RecoverMergeTask recoverMergeTask =
           new RecoverMergeTask(
               new ArrayList<>(tsFileManagement.getTsFileList(true)),
@@ -470,48 +506,9 @@ public class StorageGroupProcessor {
         mergingMods.delete();
       }
       recoverCompaction();
-      for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
-        long partitionNum = resource.getTimePartition();
-        updatePartitionFileVersion(partitionNum, resource.getVersion());
-      }
-      for (TsFileResource resource : tsFileManagement.getTsFileList(false)) {
-        long partitionNum = resource.getTimePartition();
-        updatePartitionFileVersion(partitionNum, resource.getVersion());
-      }
-      for (TsFileResource resource : upgradeSeqFileList) {
-        long partitionNum = resource.getTimePartition();
-        updatePartitionFileVersion(partitionNum, resource.getVersion());
-      }
-      for (TsFileResource resource : upgradeUnseqFileList) {
-        long partitionNum = resource.getTimePartition();
-        updatePartitionFileVersion(partitionNum, resource.getVersion());
-      }
-      updateLatestFlushedTime();
     } catch (IOException | MetadataException e) {
       throw new StorageGroupProcessorException(e);
     }
-
-    List<TsFileResource> seqTsFileResources = tsFileManagement.getTsFileList(true);
-    for (TsFileResource resource : seqTsFileResources) {
-      long timePartitionId = resource.getTimePartition();
-      Map<String, Long> endTimeMap = new HashMap<>();
-      for (String deviceId : resource.getDevices()) {
-        long endTime = resource.getEndTime(deviceId);
-        endTimeMap.put(deviceId, endTime);
-      }
-      latestTimeForEachDevice
-          .computeIfAbsent(timePartitionId, l -> new HashMap<>())
-          .putAll(endTimeMap);
-      partitionLatestFlushedTimeForEachDevice
-          .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-          .putAll(endTimeMap);
-      globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
-    }
-
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      CompactionMergeTaskPoolManager.getInstance()
-          .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
-    }
   }
 
   public class CompactionAllPartitionTask extends StorageGroupCompactionTask {
@@ -522,6 +519,10 @@ public class StorageGroupProcessor {
 
     @Override
     public Void call() {
+      logger.info(
+          "all partition in storage group {}: {}",
+          logicalStorageGroupName,
+          partitionLatestFlushedTimeForEachDevice.keySet());
       for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
         syncCompactOnePartition(
             timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
@@ -540,7 +541,7 @@ public class StorageGroupProcessor {
       try {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
-                tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+                tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
       } catch (RejectedExecutionException e) {
         this.closeCompactionMergeCallBack(false, 0);
         logger.error(
@@ -707,7 +708,7 @@ public class StorageGroupProcessor {
       RestorableTsFileIOWriter writer;
       try {
         // this tsfile is not zero level, no need to perform redo wal
-        if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) {
+        if (TsFileResource.getMergeLevel(tsFileResource.getTsFile().getName()) > 0) {
           writer =
               recoverPerformer.recover(false, this::getWalDirectByteBuffer, this::releaseWalBuffer);
           if (writer.hasCrashed()) {
@@ -1986,7 +1987,10 @@ public class StorageGroupProcessor {
 
   private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
     logger.info(
-        "{}-{} submit a compaction merge task", logicalStorageGroupName, virtualStorageGroupId);
+        "{}-{} partition:{}, submit a compaction merge task",
+        logicalStorageGroupName,
+        virtualStorageGroupId,
+        timePartition);
     try {
       // fork and filter current tsfile, then commit then to compaction merge
       tsFileManagement.forkCurrentFileList(timePartition);
@@ -2000,11 +2004,29 @@ public class StorageGroupProcessor {
     }
   }
 
+  /** close recover compaction merge callback, to start continuous compaction */
+  private void closeCompactionRecoverCallBack(boolean isMerge, long timePartitionId) {
+    logger.info(
+        "{}-{} recover finished, submit continuous compaction task",
+        logicalStorageGroupName,
+        virtualStorageGroupId);
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+      CompactionMergeTaskPoolManager.getInstance()
+          .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
+    }
+  }
+
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       syncCompactOnePartition(
           timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    } else {
+      logger.info(
+          "{}-{} partition:{}, do not have to submit a new compaction merge task",
+          logicalStorageGroupName,
+          virtualStorageGroupId,
+          timePartitionId);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index c3080d1..5c67e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -785,6 +785,11 @@ public class TsFileResource {
     return tsFileName;
   }
 
+  public static int getMergeLevel(String fileName) {
+    TsFileName tsFileName = getTsFileName(fileName);
+    return tsFileName.mergeCnt;
+  }
+
   public static TsFileResource modifyTsFileNameUnseqMergCnt(TsFileResource tsFileResource) {
     File tsFile = tsFileResource.getTsFile();
     String path = tsFile.getParent();
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index 2ce45a0..fe5aeb2 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -120,7 +120,7 @@ public class FileLoaderTest {
                 + IoTDBConstant.FILE_NAME_SEPARATOR
                 + rand
                 + IoTDBConstant.FILE_NAME_SEPARATOR
-                + "0.tsfile";
+                + "0-0.tsfile";
 
         File syncFile = new File(fileName);
         File dataFile =
@@ -244,7 +244,7 @@ public class FileLoaderTest {
                 + IoTDBConstant.FILE_NAME_SEPARATOR
                 + rand
                 + IoTDBConstant.FILE_NAME_SEPARATOR
-                + "0.tsfile";
+                + "0-0.tsfile";
 
         File syncFile = new File(fileName);
         File dataFile =
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
index 58126ef..7686021 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java
@@ -102,7 +102,7 @@ public class TsFileRewriteToolTest {
       boolean success = f.mkdir();
       Assert.assertTrue(success);
     }
-    path = folder + File.separator + System.currentTimeMillis() + "-" + 0 + "-0.tsfile";
+    path = folder + File.separator + System.currentTimeMillis() + "-" + 0 + "-0-0.tsfile";
   }
 
   @After