You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/21 11:56:07 UTC
[iotdb] 02/02: better way to start PipeMetaSyncer
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-5904
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 224e65e3e382abf98bb248297c089dde9841a3cb
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 21 19:09:00 2023 +0800
better way to start PipeMetaSyncer
---
.../manager/pipe/runtime/PipeMetaSyncer.java | 47 +++++++++++++++++++---
1 file changed, 41 insertions(+), 6 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index ba9bba61c6a..df080857e9a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -37,11 +37,10 @@ public class PipeMetaSyncer {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncer.class);
- private static final ScheduledExecutorService SYNC_EXECUTOR =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_META_SYNC_SERVICE.getName());
// TODO: make this configurable
+ private static final long INITIAL_SYNC_DELAY_MINUTES = 1;
private static final long SYNC_INTERVAL_MINUTES = 3;
+ private static ScheduledExecutorService syncExecutor;
private final ConfigManager configManager;
@@ -52,11 +51,37 @@ public class PipeMetaSyncer {
}
public synchronized void start() {
+ stop();
+
+ // 1. wait for consensus layer ready
+ while (configManager.getConsensusManager() == null) {
+ try {
+ LOGGER.info("consensus layer is not ready, sleep 1s...");
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("unexpected interruption during waiting for consensus layer ready.");
+ }
+ }
+
+ // 2. start sync executor
+ if (syncExecutor == null) {
+ syncExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_META_SYNC_SERVICE.getName());
+ LOGGER.info("syncExecutor is started successfully.");
+ }
+
+ // 3. start meta sync task
if (metaSyncFuture == null) {
metaSyncFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
- LOGGER.info("PipeMetaSyncer is started successfully.");
+ syncExecutor,
+ this::sync,
+ INITIAL_SYNC_DELAY_MINUTES,
+ SYNC_INTERVAL_MINUTES,
+ TimeUnit.MINUTES);
+ LOGGER.info("metaSyncFuture is submitted successfully.");
}
}
@@ -74,7 +99,17 @@ public class PipeMetaSyncer {
if (metaSyncFuture != null) {
metaSyncFuture.cancel(false);
metaSyncFuture = null;
- LOGGER.info("PipeMetaSyncer is stopped successfully.");
+ LOGGER.info("metaSyncFuture is cancelled successfully.");
+ }
+
+ try {
+ if (syncExecutor != null) {
+ syncExecutor.shutdown();
+ syncExecutor = null;
+ LOGGER.info("syncExecutor is shutdown successfully.");
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Failed to shutdown syncExecutor", t);
}
}
}