You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/10/02 14:08:19 UTC

[iotdb] branch master updated: recover compaction in the last position (#7490)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9749543615 recover compaction in the last position (#7490)
9749543615 is described below

commit 97495436157a0b45449ba226d6dacdb8b29d6bb4
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Sun Oct 2 22:08:12 2022 +0800

    recover compaction in the last position (#7490)
---
 .../apache/iotdb/db/engine/compaction/CompactionTaskManager.java    | 6 +++++-
 server/src/main/java/org/apache/iotdb/db/service/DataNode.java      | 2 +-
 server/src/main/java/org/apache/iotdb/db/service/IoTDB.java         | 2 +-
 server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java      | 2 +-
 4 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index bb5af28d03..46250806e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -78,6 +78,7 @@ public class CompactionTaskManager implements IService {
   private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private volatile boolean init = false;
 
   public static CompactionTaskManager getInstance() {
     return INSTANCE;
@@ -98,6 +99,7 @@ public class CompactionTaskManager implements IService {
           x ->
               CompactionMetricsRecorder.recordTaskInfo(
                   x, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size()));
+      init = true;
     }
     logger.info("Compaction task manager started.");
   }
@@ -214,7 +216,9 @@ public class CompactionTaskManager implements IService {
    */
   public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compactionTask)
       throws InterruptedException {
-    if (!candidateCompactionTaskQueue.contains(compactionTask) && !isTaskRunning(compactionTask)) {
+    if (init
+        && !candidateCompactionTaskQueue.contains(compactionTask)
+        && !isTaskRunning(compactionTask)) {
       compactionTask.setSourceFilesToCompactionCandidate();
       candidateCompactionTaskQueue.put(compactionTask);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 0b112bec46..57c58d8c03 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -274,7 +274,6 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(new JMXService());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
-    registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
 
     // close wal when using ratis consensus
@@ -320,6 +319,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(RegionMigrateService.getInstance());
 
     registerManager.register(MetricService.getInstance());
+    registerManager.register(CompactionTaskManager.getInstance());
   }
 
   /** set up RPC and protocols after DataNode is available */
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 8785aad28e..cca8a7dc2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -142,7 +142,6 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(new JMXService());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
-    registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(SyncService.getInstance());
     registerManager.register(WALManager.getInstance());
@@ -159,6 +158,7 @@ public class IoTDB implements IoTDBMBean {
                 + File.separator
                 + "udf"
                 + File.separator));
+    registerManager.register(CompactionTaskManager.getInstance());
 
     // in cluster mode, RPC service is not enabled.
     if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index 1aff419e56..8a043bf6e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -130,7 +130,6 @@ public class NewIoTDB implements NewIoTDBMBean {
     registerManager.register(new JMXService());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
-    registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(SyncService.getInstance());
     registerManager.register(WALManager.getInstance());
@@ -178,6 +177,7 @@ public class NewIoTDB implements NewIoTDBMBean {
     registerManager.register(TriggerRegistrationService.getInstance());
     registerManager.register(ContinuousQueryService.getInstance());
     registerManager.register(MetricService.getInstance());
+    registerManager.register(CompactionTaskManager.getInstance());
 
     logger.info("IoTDB configuration: " + config.getConfigMessage());
     logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");