You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/26 01:53:14 UTC
[iotdb] branch master updated: [IOTDB-4185] Prevent RatisConsensus Start Race (#7115)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8a7645c7c6 [IOTDB-4185] Prevent RatisConsensus Start Race (#7115)
8a7645c7c6 is described below
commit 8a7645c7c6f61787c8d8ffee02dc520cf8601424
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Fri Aug 26 09:53:08 2022 +0800
[IOTDB-4185] Prevent RatisConsensus Start Race (#7115)
---
.../iotdb/confignode/manager/ConfigManager.java | 3 ++
.../iotdb/confignode/manager/NodeManager.java | 22 +++++++++------
.../iotdb/confignode/manager/PartitionManager.java | 32 +++++++++++++---------
3 files changed, 36 insertions(+), 21 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index efe3463030..8a963e27b9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -169,6 +169,9 @@ public class ConfigManager implements IManager {
this.procedureManager = new ProcedureManager(this, procedureInfo);
this.udfManager = new UDFManager(this, udfInfo);
this.loadManager = new LoadManager(this);
+
+ // ConsensusManager must be initialized last, as it would load states from disk and reinitialize
+ // above managers
this.consensusManager = new ConsensusManager(this, stateMachine);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 432f2fcdc5..63abef6a34 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -68,6 +68,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -432,14 +433,19 @@ public class NodeManager {
/** loop body of the heartbeat thread */
private void heartbeatLoopBody() {
- if (getConsensusManager().isLeader()) {
- // Generate HeartbeatReq
- THeartbeatReq heartbeatReq = genHeartbeatReq();
- // Send heartbeat requests to all the registered DataNodes
- pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
- // Send heartbeat requests to all the registered ConfigNodes
- pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
- }
+ // the consensusManager of configManager may not be fully initialized at this time
+ Optional.ofNullable(getConsensusManager())
+ .ifPresent(
+ consensusManager -> {
+ if (getConsensusManager().isLeader()) {
+ // Generate HeartbeatReq
+ THeartbeatReq heartbeatReq = genHeartbeatReq();
+ // Send heartbeat requests to all the registered DataNodes
+ pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
+ // Send heartbeat requests to all the registered ConfigNodes
+ pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
+ }
+ });
}
private THeartbeatReq genHeartbeatReq() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index cc6d9c6929..a5bb056a98 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@@ -496,19 +497,24 @@ public class PartitionManager {
/** Called by {@link PartitionManager#regionCleaner} Delete RegionGroups periodically. */
public void clearDeletedRegions() {
- if (getConsensusManager().isLeader()) {
- final Set<TRegionReplicaSet> deletedRegionSet = partitionInfo.getDeletedRegionSet();
- if (!deletedRegionSet.isEmpty()) {
- LOGGER.info(
- "DELETE REGIONS {} START",
- deletedRegionSet.stream()
- .map(TRegionReplicaSet::getRegionId)
- .collect(Collectors.toList()));
- deletedRegionSet.forEach(
- regionReplicaSet -> removeRegionGroupCache(regionReplicaSet.regionId));
- SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
- }
- }
+ // the consensusManager of configManager may not be fully initialized at this time
+ Optional.ofNullable(getConsensusManager())
+ .ifPresent(
+ consensusManager -> {
+ if (getConsensusManager().isLeader()) {
+ final Set<TRegionReplicaSet> deletedRegionSet = partitionInfo.getDeletedRegionSet();
+ if (!deletedRegionSet.isEmpty()) {
+ LOGGER.info(
+ "DELETE REGIONS {} START",
+ deletedRegionSet.stream()
+ .map(TRegionReplicaSet::getRegionId)
+ .collect(Collectors.toList()));
+ deletedRegionSet.forEach(
+ regionReplicaSet -> removeRegionGroupCache(regionReplicaSet.regionId));
+ SyncDataNodeClientPool.getInstance().deleteRegions(deletedRegionSet);
+ }
+ }
+ });
}
public void startRegionCleaner() {