You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/27 11:23:13 UTC

[iotdb] branch rel/0.11 updated: [ref/0.11] change merge recover to async (#2327)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 652154a  [ref/0.11] change merge recover to async (#2327)
652154a is described below

commit 652154af31f062c57ef92d212c71679aac8673df
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sun Dec 27 19:22:49 2020 +0800

    [ref/0.11] change merge recover to async (#2327)
---
 .../compaction/CompactionMergeTaskPoolManager.java |  3 +-
 .../db/engine/compaction/TsFileManagement.java     | 15 ++++++
 .../engine/storagegroup/StorageGroupProcessor.java | 56 ++++++++++++++--------
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  4 +-
 4 files changed, 54 insertions(+), 24 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 830ea56..3473d54 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.slf4j.Logger;
@@ -112,7 +111,7 @@ public class CompactionMergeTaskPoolManager implements IService {
     return ServiceType.COMPACTION_SERVICE;
   }
 
-  public void submitTask(CompactionMergeTask compactionMergeTask)
+  public void submitTask(Runnable compactionMergeTask)
       throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
       pool.submit(compactionMergeTask);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index d10b4c1..f7f68c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -178,6 +178,21 @@ public abstract class TsFileManagement {
     }
   }
 
+  public class CompactionRecoverTask implements Runnable {
+
+    private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
+
+    public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+      this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
+    }
+
+    @Override
+    public void run() {
+      recover();
+      closeCompactionMergeCallBack.call();
+    }
+  }
+
   public void merge(boolean fullMerge, List<TsFileResource> seqMergeList,
       List<TsFileResource> unSeqMergeList, long dataTTL) {
     if (isUnseqMerging) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfe409d..a91661b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -335,8 +335,7 @@ public class StorageGroupProcessor {
       if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
         mergingMods.delete();
       }
-      tsFileManagement.recover();
-
+      recoverCompaction();
       for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
         long partitionNum = resource.getTimePartition();
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
@@ -372,6 +371,24 @@ public class StorageGroupProcessor {
     }
   }
 
+  private void recoverCompaction() {
+    if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
+      compactionMergeWorking = true;
+      logger.info("{} submit a compaction merge task", storageGroupName);
+      try {
+        CompactionMergeTaskPoolManager.getInstance()
+            .submitTask(
+                tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+      } catch (RejectedExecutionException e) {
+        this.closeCompactionMergeCallBack();
+        logger.error("{} compaction submit task failed", storageGroupName);
+      }
+    } else {
+      logger.error("{} compaction pool not started ,recover failed",
+          storageGroupName);
+    }
+  }
+
   private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
     long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L);
     if (fileVersion > oldVersion) {
@@ -583,8 +600,6 @@ public class StorageGroupProcessor {
         continue;
       }
 
-
-
       if (i != tsFiles.size() - 1 || !writer.canWrite()) {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
@@ -810,11 +825,11 @@ public class StorageGroupProcessor {
    * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence         whether is sequence
-   * @param start            start index of rows to be inserted in insertTabletPlan
-   * @param end              end index of rows to be inserted in insertTabletPlan
-   * @param results          result array
-   * @param timePartitionId  time partition id
+   * @param sequence whether is sequence
+   * @param start start index of rows to be inserted in insertTabletPlan
+   * @param end end index of rows to be inserted in insertTabletPlan
+   * @param results result array
+   * @param timePartitionId time partition id
    * @return false if any failure occurs when inserting the tablet, true otherwise
    */
   private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -974,9 +989,9 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId            time partition range
+   * @param timeRangeId time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param sequence               whether is sequence or not
+   * @param sequence whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
       TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1040,7 +1055,8 @@ public class StorageGroupProcessor {
       tsFileProcessor = new TsFileProcessor(storageGroupName,
           fsFactory.getFileWithParent(filePath), storageGroupInfo,
           versionController, this::closeUnsealedTsFileProcessorCallBack,
-          this::updateLatestFlushTimeCallback, true, partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+          this::updateLatestFlushTimeCallback, true,
+          partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
       if (enableMemControl) {
         TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
         tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -1052,7 +1068,8 @@ public class StorageGroupProcessor {
       tsFileProcessor = new TsFileProcessor(storageGroupName,
           fsFactory.getFileWithParent(filePath), storageGroupInfo,
           versionController, this::closeUnsealedTsFileProcessorCallBack,
-          this::unsequenceFlushCallback, false, partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+          this::unsequenceFlushCallback, false,
+          partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
       if (enableMemControl) {
         TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
         tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -1490,10 +1507,9 @@ public class StorageGroupProcessor {
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
    *
-   * @param path      the timeseries path of the to be deleted.
+   * @param path the timeseries path of the to be deleted.
    * @param startTime the startTime of delete range.
-   * @param endTime   the endTime of delete range.
-   * @param planIndex
+   * @param endTime the endTime of delete range.
    */
   public void delete(PartialPath path, long startTime, long endTime, long planIndex)
       throws IOException {
@@ -2112,9 +2128,9 @@ public class StorageGroupProcessor {
    * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
    * version number is the version number in the tsfile with a larger timestamp.
    *
-   * @param tsfileName  origin tsfile name
+   * @param tsfileName origin tsfile name
    * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
-   *                    1]
+   * 1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2178,8 +2194,8 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type            load type
-   * @param tsFileResource  tsfile resource to be loaded
+   * @param type load type
+   * @param tsFileResource tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f56e7ca..dc05c1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -106,6 +106,8 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(Measurement.INSTANCE);
     registerManager.register(TVListAllocator.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
+    registerManager.register(MergeManager.getINSTANCE());
+    registerManager.register(CompactionMergeTaskPoolManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(StorageEngine.getInstance());
 
@@ -138,8 +140,6 @@ public class IoTDB implements IoTDBMBean {
 
     registerManager.register(SyncServerManager.getInstance());
     registerManager.register(UpgradeSevice.getINSTANCE());
-    registerManager.register(MergeManager.getINSTANCE());
-    registerManager.register(CompactionMergeTaskPoolManager.getInstance());
 
     logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");
   }