You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/21 11:56:07 UTC

[iotdb] 02/02: better way to start PipeMetaSyncer

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5904
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 224e65e3e382abf98bb248297c089dde9841a3cb
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 21 19:09:00 2023 +0800

    better way to start PipeMetaSyncer
---
 .../manager/pipe/runtime/PipeMetaSyncer.java       | 47 +++++++++++++++++++---
 1 file changed, 41 insertions(+), 6 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index ba9bba61c6a..df080857e9a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -37,11 +37,10 @@ public class PipeMetaSyncer {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncer.class);
 
-  private static final ScheduledExecutorService SYNC_EXECUTOR =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_META_SYNC_SERVICE.getName());
   // TODO: make this configurable
+  private static final long INITIAL_SYNC_DELAY_MINUTES = 1;
   private static final long SYNC_INTERVAL_MINUTES = 3;
+  private static ScheduledExecutorService syncExecutor;
 
   private final ConfigManager configManager;
 
@@ -52,11 +51,37 @@ public class PipeMetaSyncer {
   }
 
   public synchronized void start() {
+    stop();
+
+    // 1. wait for consensus layer ready
+    while (configManager.getConsensusManager() == null) {
+      try {
+        LOGGER.info("consensus layer is not ready, sleep 1s...");
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("unexpected interruption during waiting for consensus layer ready.");
+      }
+    }
+
+    // 2. start sync executor
+    if (syncExecutor == null) {
+      syncExecutor =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              ThreadName.PIPE_META_SYNC_SERVICE.getName());
+      LOGGER.info("syncExecutor is started successfully.");
+    }
+
+    // 3. start meta sync task
     if (metaSyncFuture == null) {
       metaSyncFuture =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-              SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
-      LOGGER.info("PipeMetaSyncer is started successfully.");
+              syncExecutor,
+              this::sync,
+              INITIAL_SYNC_DELAY_MINUTES,
+              SYNC_INTERVAL_MINUTES,
+              TimeUnit.MINUTES);
+      LOGGER.info("metaSyncFuture is submitted successfully.");
     }
   }
 
@@ -74,7 +99,17 @@ public class PipeMetaSyncer {
     if (metaSyncFuture != null) {
       metaSyncFuture.cancel(false);
       metaSyncFuture = null;
-      LOGGER.info("PipeMetaSyncer is stopped successfully.");
+      LOGGER.info("metaSyncFuture is cancelled successfully.");
+    }
+
+    try {
+      if (syncExecutor != null) {
+        syncExecutor.shutdown();
+        syncExecutor = null;
+        LOGGER.info("syncExecutor is shutdown successfully.");
+      }
+    } catch (Throwable t) {
+      LOGGER.error("Failed to shutdown syncExecutor", t);
     }
   }
 }