You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/22 14:50:26 UTC

[iotdb] 04/06: safety start pipe tasks

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 86f0e554bb6b3c17f4ddccaafdafa21379dc7b26
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 22 19:33:53 2023 +0800

    safety start pipe tasks
---
 .../statemachine/ConfigRegionStateMachine.java      |  6 ++++--
 .../manager/pipe/runtime/PipeMetaSyncer.java        | 21 ++++++++++++++++++---
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 53e7f9ae688..4964ba79774 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -64,7 +64,7 @@ public class ConfigRegionStateMachine
   private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionStateMachine.class);
 
   private static final ExecutorService threadPool =
-      IoTDBThreadPoolFactory.newCachedThreadPool("CQ-recovery");
+      IoTDBThreadPoolFactory.newCachedThreadPool("ConfigNode-Manager-Recovery");
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
@@ -218,7 +218,9 @@ public class ConfigRegionStateMachine
       // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
       // initialized after notifyLeaderChanged finished
       threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
-      configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync();
+
+      threadPool.submit(
+          () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
     } else {
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index ba9bba61c6a..4d6ebb04ff0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -40,7 +40,8 @@ public class PipeMetaSyncer {
   private static final ScheduledExecutorService SYNC_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.PIPE_META_SYNC_SERVICE.getName());
-  // TODO: make this configurable
+  // TODO: make these configurable
+  private static final long INITIAL_SYNC_DELAY_MINUTES = 3;
   private static final long SYNC_INTERVAL_MINUTES = 3;
 
   private final ConfigManager configManager;
@@ -52,15 +53,29 @@ public class PipeMetaSyncer {
   }
 
   public synchronized void start() {
+    while (configManager.getConsensusManager() == null) {
+      try {
+        LOGGER.info("consensus layer is not ready, sleep 1s...");
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("unexpected interruption during waiting for consensus layer ready.");
+      }
+    }
+
     if (metaSyncFuture == null) {
       metaSyncFuture =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-              SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
+              SYNC_EXECUTOR,
+              this::sync,
+              INITIAL_SYNC_DELAY_MINUTES,
+              SYNC_INTERVAL_MINUTES,
+              TimeUnit.MINUTES);
       LOGGER.info("PipeMetaSyncer is started successfully.");
     }
   }
 
-  private void sync() {
+  private synchronized void sync() {
     final TSStatus status = configManager.getProcedureManager().pipeMetaSync();
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn(