You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/26 02:48:48 UTC

[iotdb] branch tiered_storage updated: limit migration tasks number

This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tiered_storage by this push:
     new abd79edc9d5 limit migration tasks number
abd79edc9d5 is described below

commit abd79edc9d541256b04c983b5d8ce0d1cb404d45
Author: HeimingZ <zh...@qq.com>
AuthorDate: Fri May 26 10:48:33 2023 +0800

    limit migration tasks number
---
 .../iotdb/db/engine/migration/MigrationTask.java       |  1 +
 .../db/engine/migration/MigrationTaskManager.java      | 18 ++++++++++++++----
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
index f6dfef0338d..b97ef7acf35 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -85,6 +85,7 @@ public abstract class MigrationTask implements Runnable {
       // try to set the final status to NORMAL to avoid migrate failure
       // TODO: this setting may occur side effects
       tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+      MigrationTaskManager.getInstance().decreaseMigrationTasksNum();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index c9766aaa706..a67a5358f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -45,6 +45,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class MigrationTaskManager implements IService {
   private static final Logger logger = LoggerFactory.getLogger(MigrationTaskManager.class);
@@ -56,6 +57,9 @@ public class MigrationTaskManager implements IService {
       commonConfig.getDiskSpaceWarningThreshold() + 0.1;
   private static final double TIER_DISK_SPACE_SAFE_THRESHOLD =
       commonConfig.getDiskSpaceWarningThreshold() + 0.2;
+  private static final int MIGRATION_TASK_LIMIT = 20;
+  /** max concurrent migration tasks */
+  private final AtomicInteger migrationTasksNum = new AtomicInteger(0);
   /** single thread to schedule */
   private ScheduledExecutorService scheduler;
   /** single thread to sync syncingBuffer to disk */
@@ -121,13 +125,13 @@ public class MigrationTaskManager implements IService {
                     System.currentTimeMillis() - commonConfig.getTierTTLInMs()[currentTier],
                     iotdbConfig.getTimestampPrecision());
             if (!tsfile.stillLives(tierTTL)) {
-              submitMigrationTask(
+              trySubmitMigrationTask(
                   currentTier,
                   MigrationCause.TTL,
                   tsfile,
                   tierManager.getNextFolderForTsFile(nextTier, tsfile.isSeq()));
             } else if (needMigrationTiers.contains(currentTier)) {
-              submitMigrationTask(
+              trySubmitMigrationTask(
                   currentTier,
                   MigrationCause.DISK_SPACE,
                   tsfile,
@@ -141,12 +145,14 @@ public class MigrationTaskManager implements IService {
       }
     }
 
-    private void submitMigrationTask(
+    private void trySubmitMigrationTask(
         int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir)
         throws IOException {
-      if (!sourceTsFile.setStatus(TsFileResourceStatus.MIGRATING)) {
+      if (migrationTasksNum.get() >= MIGRATION_TASK_LIMIT
+          || !sourceTsFile.setStatus(TsFileResourceStatus.MIGRATING)) {
         return;
       }
+      migrationTasksNum.incrementAndGet();
       MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir);
       workers.submit(task);
       tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize();
@@ -177,6 +183,10 @@ public class MigrationTaskManager implements IService {
     }
   }
 
+  void decreaseMigrationTasksNum() {
+    migrationTasksNum.decrementAndGet();
+  }
+
   @Override
   public void stop() {
     if (scheduler != null) {