You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/26 02:48:48 UTC
[iotdb] branch tiered_storage updated: limit migration tasks number
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new abd79edc9d5 limit migration tasks number
abd79edc9d5 is described below
commit abd79edc9d541256b04c983b5d8ce0d1cb404d45
Author: HeimingZ <zh...@qq.com>
AuthorDate: Fri May 26 10:48:33 2023 +0800
limit migration tasks number
---
.../iotdb/db/engine/migration/MigrationTask.java | 1 +
.../db/engine/migration/MigrationTaskManager.java | 18 ++++++++++++++----
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
index f6dfef0338d..b97ef7acf35 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -85,6 +85,7 @@ public abstract class MigrationTask implements Runnable {
// try to set the final status to NORMAL to avoid migrate failure
// TODO: this setting may occur side effects
tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+ MigrationTaskManager.getInstance().decreaseMigrationTasksNum();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index c9766aaa706..a67a5358f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -45,6 +45,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class MigrationTaskManager implements IService {
private static final Logger logger = LoggerFactory.getLogger(MigrationTaskManager.class);
@@ -56,6 +57,9 @@ public class MigrationTaskManager implements IService {
commonConfig.getDiskSpaceWarningThreshold() + 0.1;
private static final double TIER_DISK_SPACE_SAFE_THRESHOLD =
commonConfig.getDiskSpaceWarningThreshold() + 0.2;
+ private static final int MIGRATION_TASK_LIMIT = 20;
+ /** max concurrent migration tasks */
+ private final AtomicInteger migrationTasksNum = new AtomicInteger(0);
/** single thread to schedule */
private ScheduledExecutorService scheduler;
/** single thread to sync syncingBuffer to disk */
@@ -121,13 +125,13 @@ public class MigrationTaskManager implements IService {
System.currentTimeMillis() - commonConfig.getTierTTLInMs()[currentTier],
iotdbConfig.getTimestampPrecision());
if (!tsfile.stillLives(tierTTL)) {
- submitMigrationTask(
+ trySubmitMigrationTask(
currentTier,
MigrationCause.TTL,
tsfile,
tierManager.getNextFolderForTsFile(nextTier, tsfile.isSeq()));
} else if (needMigrationTiers.contains(currentTier)) {
- submitMigrationTask(
+ trySubmitMigrationTask(
currentTier,
MigrationCause.DISK_SPACE,
tsfile,
@@ -141,12 +145,14 @@ public class MigrationTaskManager implements IService {
}
}
- private void submitMigrationTask(
+ private void trySubmitMigrationTask(
int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir)
throws IOException {
- if (!sourceTsFile.setStatus(TsFileResourceStatus.MIGRATING)) {
+ if (migrationTasksNum.get() >= MIGRATION_TASK_LIMIT
+ || !sourceTsFile.setStatus(TsFileResourceStatus.MIGRATING)) {
return;
}
+ migrationTasksNum.incrementAndGet();
MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir);
workers.submit(task);
tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize();
@@ -177,6 +183,10 @@ public class MigrationTaskManager implements IService {
}
}
+ void decreaseMigrationTasksNum() {
+ migrationTasksNum.decrementAndGet();
+ }
+
@Override
public void stop() {
if (scheduler != null) {