You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/10/02 14:08:19 UTC
[iotdb] branch master updated: recover compaction in the last position (#7490)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9749543615 recover compaction in the last position (#7490)
9749543615 is described below
commit 97495436157a0b45449ba226d6dacdb8b29d6bb4
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Sun Oct 2 22:08:12 2022 +0800
recover compaction in the last position (#7490)
---
.../apache/iotdb/db/engine/compaction/CompactionTaskManager.java | 6 +++++-
server/src/main/java/org/apache/iotdb/db/service/DataNode.java | 2 +-
server/src/main/java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java | 2 +-
4 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index bb5af28d03..46250806e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -78,6 +78,7 @@ public class CompactionTaskManager implements IService {
private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private volatile boolean init = false;
public static CompactionTaskManager getInstance() {
return INSTANCE;
@@ -98,6 +99,7 @@ public class CompactionTaskManager implements IService {
x ->
CompactionMetricsRecorder.recordTaskInfo(
x, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size()));
+ init = true;
}
logger.info("Compaction task manager started.");
}
@@ -214,7 +216,9 @@ public class CompactionTaskManager implements IService {
*/
public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compactionTask)
throws InterruptedException {
- if (!candidateCompactionTaskQueue.contains(compactionTask) && !isTaskRunning(compactionTask)) {
+ if (init
+ && !candidateCompactionTaskQueue.contains(compactionTask)
+ && !isTaskRunning(compactionTask)) {
compactionTask.setSourceFilesToCompactionCandidate();
candidateCompactionTaskQueue.put(compactionTask);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 0b112bec46..57c58d8c03 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -274,7 +274,6 @@ public class DataNode implements DataNodeMBean {
registerManager.register(new JMXService());
registerManager.register(FlushManager.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
- registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
// close wal when using ratis consensus
@@ -320,6 +319,7 @@ public class DataNode implements DataNodeMBean {
registerManager.register(RegionMigrateService.getInstance());
registerManager.register(MetricService.getInstance());
+ registerManager.register(CompactionTaskManager.getInstance());
}
/** set up RPC and protocols after DataNode is available */
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 8785aad28e..cca8a7dc2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -142,7 +142,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(new JMXService());
registerManager.register(FlushManager.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
- registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(SyncService.getInstance());
registerManager.register(WALManager.getInstance());
@@ -159,6 +158,7 @@ public class IoTDB implements IoTDBMBean {
+ File.separator
+ "udf"
+ File.separator));
+ registerManager.register(CompactionTaskManager.getInstance());
// in cluster mode, RPC service is not enabled.
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index 1aff419e56..8a043bf6e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -130,7 +130,6 @@ public class NewIoTDB implements NewIoTDBMBean {
registerManager.register(new JMXService());
registerManager.register(FlushManager.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
- registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(SyncService.getInstance());
registerManager.register(WALManager.getInstance());
@@ -178,6 +177,7 @@ public class NewIoTDB implements NewIoTDBMBean {
registerManager.register(TriggerRegistrationService.getInstance());
registerManager.register(ContinuousQueryService.getInstance());
registerManager.register(MetricService.getInstance());
+ registerManager.register(CompactionTaskManager.getInstance());
logger.info("IoTDB configuration: " + config.getConfigMessage());
logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");