You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/26 01:53:14 UTC

[iotdb] branch master updated: [IOTDB-4185] Prevent RatisConsensus Start Race (#7115)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a7645c7c6 [IOTDB-4185] Prevent RatisConsensus Start Race (#7115)
8a7645c7c6 is described below

commit 8a7645c7c6f61787c8d8ffee02dc520cf8601424
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Fri Aug 26 09:53:08 2022 +0800

    [IOTDB-4185] Prevent RatisConsensus Start Race (#7115)
---
 .../iotdb/confignode/manager/ConfigManager.java    |  3 ++
 .../iotdb/confignode/manager/NodeManager.java      | 22 +++++++++------
 .../iotdb/confignode/manager/PartitionManager.java | 32 +++++++++++++---------
 3 files changed, 36 insertions(+), 21 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index efe3463030..8a963e27b9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -169,6 +169,9 @@ public class ConfigManager implements IManager {
     this.procedureManager = new ProcedureManager(this, procedureInfo);
     this.udfManager = new UDFManager(this, udfInfo);
     this.loadManager = new LoadManager(this);
+
+    // ConsensusManager must be initialized last, as it would load states from disk and reinitialize
+    // above managers
     this.consensusManager = new ConsensusManager(this, stateMachine);
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 432f2fcdc5..63abef6a34 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -68,6 +68,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -432,14 +433,19 @@ public class NodeManager {
 
   /** loop body of the heartbeat thread */
   private void heartbeatLoopBody() {
-    if (getConsensusManager().isLeader()) {
-      // Generate HeartbeatReq
-      THeartbeatReq heartbeatReq = genHeartbeatReq();
-      // Send heartbeat requests to all the registered DataNodes
-      pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
-      // Send heartbeat requests to all the registered ConfigNodes
-      pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
-    }
+    // the consensusManager of configManager may not be fully initialized at this time
+    Optional.ofNullable(getConsensusManager())
+        .ifPresent(
+            consensusManager -> {
+              if (getConsensusManager().isLeader()) {
+                // Generate HeartbeatReq
+                THeartbeatReq heartbeatReq = genHeartbeatReq();
+                // Send heartbeat requests to all the registered DataNodes
+                pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
+                // Send heartbeat requests to all the registered ConfigNodes
+                pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
+              }
+            });
   }
 
   private THeartbeatReq genHeartbeatReq() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index cc6d9c6929..a5bb056a98 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
@@ -496,19 +497,24 @@ public class PartitionManager {
 
   /** Called by {@link PartitionManager#regionCleaner} Delete RegionGroups periodically. */
   public void clearDeletedRegions() {
-    if (getConsensusManager().isLeader()) {
-      final Set<TRegionReplicaSet> deletedRegionSet = partitionInfo.getDeletedRegionSet();
-      if (!deletedRegionSet.isEmpty()) {
-        LOGGER.info(
-            "DELETE REGIONS {} START",
-            deletedRegionSet.stream()
-                .map(TRegionReplicaSet::getRegionId)
-                .collect(Collectors.toList()));
-        deletedRegionSet.forEach(
-            regionReplicaSet -> removeRegionGroupCache(regionReplicaSet.regionId));
-        SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
-      }
-    }
+    // the consensusManager of configManager may not be fully initialized at this time
+    Optional.ofNullable(getConsensusManager())
+        .ifPresent(
+            consensusManager -> {
+              if (getConsensusManager().isLeader()) {
+                final Set<TRegionReplicaSet> deletedRegionSet = partitionInfo.getDeletedRegionSet();
+                if (!deletedRegionSet.isEmpty()) {
+                  LOGGER.info(
+                      "DELETE REGIONS {} START",
+                      deletedRegionSet.stream()
+                          .map(TRegionReplicaSet::getRegionId)
+                          .collect(Collectors.toList()));
+                  deletedRegionSet.forEach(
+                      regionReplicaSet -> removeRegionGroupCache(regionReplicaSet.regionId));
+                  SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
+                }
+              }
+            });
   }
 
   public void startRegionCleaner() {