You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/12 15:43:19 UTC
[iotdb] branch master updated: [IOTDB-4112] Decoupling heartbeat scheduled executor service from LoadManager (#6979)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5516fe17ff [IOTDB-4112] Decoupling heartbeat scheduled executor service from LoadManager (#6979)
5516fe17ff is described below
commit 5516fe17ffdd0831a7d1caf7a1a4659ceaaa6bad
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Fri Aug 12 23:43:12 2022 +0800
[IOTDB-4112] Decoupling heartbeat scheduled executor service from LoadManager (#6979)
---
.../statemachine/PartitionRegionStateMachine.java | 8 +-
.../iotdb/confignode/manager/ConfigManager.java | 5 +-
.../iotdb/confignode/manager/NodeManager.java | 229 ++++++++++++++++--
.../iotdb/confignode/manager/PartitionManager.java | 52 +++-
.../iotdb/confignode/manager/load/LoadManager.java | 261 ++-------------------
.../manager/load/LoadManagerMetrics.java | 47 ++--
.../manager/load/balancer/RegionBalancer.java | 10 +-
.../manager/load/balancer/RouteBalancer.java | 23 +-
.../load/heartbeat/ConfigNodeHeartbeatCache.java | 4 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 3 -
.../procedure/env/DataNodeRemoveHandler.java | 5 +-
11 files changed, 330 insertions(+), 317 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 4259e5df71..7af80b66f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-/** Statemachine for PartitionRegion */
+/** StateMachine for PartitionRegion */
public class PartitionRegionStateMachine implements IStateMachine, IStateMachine.EventApi {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
@@ -141,13 +141,15 @@ public class PartitionRegionStateMachine implements IStateMachine, IStateMachine
if (currentNode.equals(newLeader)) {
LOGGER.info("Current node {} becomes Leader", newLeader);
configManager.getProcedureManager().shiftExecutor(true);
- configManager.getLoadManager().start();
+ configManager.getLoadManager().startLoadBalancingService();
+ configManager.getNodeManager().startHeartbeatService();
configManager.getPartitionManager().startRegionCleaner();
} else {
LOGGER.info(
"Current node {} is not longer the leader, the new leader is {}", currentNode, newLeader);
configManager.getProcedureManager().shiftExecutor(false);
- configManager.getLoadManager().stop();
+ configManager.getLoadManager().stopLoadBalancingService();
+ configManager.getNodeManager().stopHeartbeatService();
configManager.getPartitionManager().stopRegionCleaner();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a7a53a8f6a..b5b93d5267 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -245,7 +245,7 @@ public class ConfigManager implements IManager {
.sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
.collect(Collectors.toList());
Map<Integer, String> nodeStatus = new HashMap<>();
- getLoadManager()
+ getNodeManager()
.getNodeCacheMap()
.forEach(
(nodeId, heartbeatCache) ->
@@ -858,7 +858,8 @@ public class ConfigManager implements IManager {
.getRegionInfoList()
.forEach(
regionInfo -> {
- Map<TConsensusGroupId, Integer> allLeadership = loadManager.getAllLeadership();
+ Map<TConsensusGroupId, Integer> allLeadership =
+ getPartitionManager().getAllLeadership();
if (!allLeadership.isEmpty()) {
String regionType =
regionInfo.getDataNodeId()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 881f0b29d4..f188ae7148 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -21,11 +21,19 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -37,6 +45,8 @@ import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
@@ -46,6 +56,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -57,22 +68,44 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
+ private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+ public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
+
+ public static final TEndPoint CURRENT_NODE =
+ new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
+
private final IManager configManager;
private final NodeInfo nodeInfo;
private final ReentrantLock removeConfigNodeLock;
+ /** Heartbeat executor service */
+ // Monitor for leadership change
+ private final Object scheduleMonitor = new Object();
+ // Map<NodeId, INodeCache>
+ private final Map<Integer, INodeCache> nodeCacheMap;
+ private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+ private Future<?> currentHeartbeatFuture;
+ private final ScheduledExecutorService heartBeatExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
+
public NodeManager(IManager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
this.removeConfigNodeLock = new ReentrantLock();
+ this.nodeCacheMap = new ConcurrentHashMap<>();
}
private void setGlobalConfig(DataNodeRegisterResp dataSet) {
@@ -181,15 +214,6 @@ public class NodeManager {
return nodeInfo.getRegisteredDataNodeCount();
}
- /**
- * Only leader use this interface
- *
- * @return The number of registered TotalNodes
- */
- public int getRegisteredNodeCount() {
- return nodeInfo.getRegisteredNodeCount();
- }
-
/**
* Only leader use this interface
*
@@ -220,15 +244,6 @@ public class NodeManager {
return dataNodeLocations;
}
- private INodeCache getNodeCache(int nodeId) {
- return getLoadManager().getNodeCacheMap().get(nodeId);
- }
-
- private String getNodeStatus(int nodeId) {
- INodeCache nodeCache = getNodeCache(nodeId);
- return nodeCache == null ? "Unknown" : nodeCache.getNodeStatus().getStatus();
- }
-
public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
@@ -293,7 +308,7 @@ public class NodeManager {
removeConfigNodeLock.tryLock();
try {
// Check OnlineConfigNodes number
- if (getLoadManager().getOnlineConfigNodes().size() <= 1) {
+ if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage(
"Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
@@ -331,7 +346,7 @@ public class NodeManager {
private TSStatus transferLeader(
RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
TConfigNodeLocation newLeader =
- getLoadManager().getOnlineConfigNodes().stream()
+ filterConfigNodeThroughStatus(NodeStatus.Running).stream()
.filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
.findAny()
.get();
@@ -384,6 +399,176 @@ public class NodeManager {
return dataNodeResponseStatus;
}
+ /** Start the heartbeat service */
+ public void startHeartbeatService() {
+ synchronized (scheduleMonitor) {
+ if (currentHeartbeatFuture == null) {
+ currentHeartbeatFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ heartBeatExecutor,
+ this::heartbeatLoopBody,
+ 0,
+ HEARTBEAT_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Heartbeat service is started successfully.");
+ }
+ }
+ }
+
+ /** loop body of the heartbeat thread */
+ private void heartbeatLoopBody() {
+ if (getConsensusManager().isLeader()) {
+ // Generate HeartbeatReq
+ THeartbeatReq heartbeatReq = genHeartbeatReq();
+ // Send heartbeat requests to all the registered DataNodes
+ pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
+ // Send heartbeat requests to all the registered ConfigNodes
+ pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
+ }
+ }
+
+ private THeartbeatReq genHeartbeatReq() {
+ /* Generate heartbeat request */
+ THeartbeatReq heartbeatReq = new THeartbeatReq();
+ heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+ // We update RegionGroups' leadership in every 5 heartbeat loop
+ heartbeatReq.setNeedJudgeLeader(heartbeatCounter.get() % 5 == 0);
+ // We sample DataNode's load in every 10 heartbeat loop
+ heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+
+ /* Update heartbeat counter */
+ heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+ return heartbeatReq;
+ }
+
+ /**
+ * Send heartbeat requests to all the Registered DataNodes
+ *
+ * @param registeredDataNodes DataNodes that registered in cluster
+ */
+ private void pingRegisteredDataNodes(
+ THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
+ // Send heartbeat requests
+ for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
+ DataNodeHeartbeatHandler handler =
+ new DataNodeHeartbeatHandler(
+ dataNodeInfo.getLocation(),
+ (DataNodeHeartbeatCache)
+ nodeCacheMap.computeIfAbsent(
+ dataNodeInfo.getLocation().getDataNodeId(),
+ empty -> new DataNodeHeartbeatCache()),
+ getPartitionManager().getRegionGroupCacheMap());
+ AsyncDataNodeHeartbeatClientPool.getInstance()
+ .getDataNodeHeartBeat(
+ dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+ }
+ }
+
+ /**
+ * Send heartbeat requests to all the Registered ConfigNodes
+ *
+ * @param registeredConfigNodes ConfigNodes that registered in cluster
+ */
+ private void pingRegisteredConfigNodes(
+ THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
+ // Send heartbeat requests
+ for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+ if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
+ // Skip itself
+ nodeCacheMap.putIfAbsent(
+ configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
+ continue;
+ }
+
+ ConfigNodeHeartbeatHandler handler =
+ new ConfigNodeHeartbeatHandler(
+ (ConfigNodeHeartbeatCache)
+ nodeCacheMap.computeIfAbsent(
+ configNodeLocation.getConfigNodeId(),
+ empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
+ AsyncConfigNodeHeartbeatClientPool.getInstance()
+ .getConfigNodeHeartBeat(
+ configNodeLocation.getInternalEndPoint(),
+ heartbeatReq.getHeartbeatTimestamp(),
+ handler);
+ }
+ }
+
+ /** Stop the heartbeat service */
+ public void stopHeartbeatService() {
+ synchronized (scheduleMonitor) {
+ if (currentHeartbeatFuture != null) {
+ currentHeartbeatFuture.cancel(false);
+ currentHeartbeatFuture = null;
+ nodeCacheMap.clear();
+ LOGGER.info("Heartbeat service is stopped successfully.");
+ }
+ }
+ }
+
+ public Map<Integer, INodeCache> getNodeCacheMap() {
+ return nodeCacheMap;
+ }
+
+ /**
+ * Safely get the specific Node's current status
+ *
+ * @param nodeId The specific Node's index
+ * @return The specific Node's current status if the nodeCache contains it, Unknown otherwise
+ */
+ private String getNodeStatus(int nodeId) {
+ INodeCache nodeCache = nodeCacheMap.get(nodeId);
+ return nodeCache == null ? "Unknown" : nodeCache.getNodeStatus().getStatus();
+ }
+
+ /**
+ * Filter the registered ConfigNodes through the specific NodeStatus
+ *
+ * @param status The specific NodeStatus
+ * @return Filtered ConfigNodes with the specific NodeStatus
+ */
+ public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus status) {
+ return getRegisteredConfigNodes().stream()
+ .filter(
+ registeredConfigNode -> {
+ int configNodeId = registeredConfigNode.getConfigNodeId();
+ return nodeCacheMap.containsKey(configNodeId)
+ && status.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Filter the registered DataNodes through the specific NodeStatus
+ *
+ * @param status The specific NodeStatus
+ * @return Filtered DataNodes with the specific NodeStatus
+ */
+ public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus status) {
+ return getRegisteredDataNodes().stream()
+ .filter(
+ registeredDataNode -> {
+ int id = registeredDataNode.getLocation().getDataNodeId();
+ return nodeCacheMap.containsKey(id)
+ && status.equals(nodeCacheMap.get(id).getNodeStatus());
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Get the loadScore of each DataNode
+ *
+ * @return Map<DataNodeId, loadScore>
+ */
+ public Map<Integer, Long> getAllLoadScores() {
+ Map<Integer, Long> result = new ConcurrentHashMap<>();
+
+ nodeCacheMap.forEach(
+ (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
+
+ return result;
+ }
+
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
return nodeInfo.getRegisteredConfigNodes();
}
@@ -396,7 +581,7 @@ public class NodeManager {
return configManager.getClusterSchemaManager();
}
- private LoadManager getLoadManager() {
- return configManager.getLoadManager();
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 5aeb9885e3..c66e9a72ae 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -49,7 +49,9 @@ import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -86,15 +88,19 @@ public class PartitionManager {
private final ScheduledExecutorService regionCleaner;
private Future<?> currentRegionCleanerFuture;
+ // Map<RegionId, RegionGroupCache>
+ private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
+
public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
this.partitionInfo = partitionInfo;
this.regionCleaner =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Cleaner");
+ this.regionGroupCacheMap = new ConcurrentHashMap<>();
setSeriesPartitionExecutor();
}
- /** Construct SeriesPartitionExecutor by iotdb-confignode.propertis */
+ /** Construct SeriesPartitionExecutor by iotdb-confignode.properties */
private void setSeriesPartitionExecutor() {
ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
this.executor =
@@ -407,7 +413,7 @@ public class PartitionManager {
*
* @param physicalPlan GetNodesPathsPartitionReq
* @return SchemaNodeManagementPartitionDataSet that contains only existing matched
- * SchemaPartition and matched child paths aboveMtree
+ * SchemaPartition and matched child paths aboveMTree
*/
public DataSet getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan) {
SchemaNodeManagementResp schemaNodeManagementResp;
@@ -501,11 +507,49 @@ public class PartitionManager {
/* Stop the RegionCleaner service */
currentRegionCleanerFuture.cancel(false);
currentRegionCleanerFuture = null;
+ regionGroupCacheMap.clear();
LOGGER.info("RegionCleaner is stopped successfully.");
}
}
}
+ public Map<TConsensusGroupId, IRegionGroupCache> getRegionGroupCacheMap() {
+ return regionGroupCacheMap;
+ }
+
+ /**
+ * Get the leadership of each RegionGroup
+ *
+ * @return Map<RegionGroupId, leader location>
+ */
+ public Map<TConsensusGroupId, Integer> getAllLeadership() {
+ Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
+ if (ConfigNodeDescriptor.getInstance()
+ .getConf()
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.MultiLeaderConsensus)) {
+ regionGroupCacheMap.forEach(
+ (consensusGroupId, regionGroupCache) -> {
+ if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) {
+ result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId());
+ }
+ });
+ getLoadManager()
+ .getRouteBalancer()
+ .getRouteMap()
+ .forEach(
+ (consensusGroupId, regionReplicaSet) ->
+ result.put(
+ consensusGroupId,
+ regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId()));
+ } else {
+ regionGroupCacheMap.forEach(
+ (consensusGroupId, regionGroupCache) ->
+ result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId()));
+ }
+ return result;
+ }
+
public ScheduledExecutorService getRegionCleaner() {
return regionCleaner;
}
@@ -514,10 +558,6 @@ public class PartitionManager {
return configManager.getConsensusManager();
}
- private NodeManager getNodeManager() {
- return configManager.getNodeManager();
- }
-
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 800ce1628d..abebafa3c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -18,12 +18,9 @@
*/
package org.apache.iotdb.confignode.manager.load;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -33,11 +30,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeHeartbeatClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -51,12 +44,8 @@ import org.apache.iotdb.confignode.manager.PartitionManager;
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.slf4j.Logger;
@@ -70,7 +59,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -83,19 +71,8 @@ public class LoadManager {
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
- private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
-
- public static final TEndPoint CURRENT_NODE =
- new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
-
private final IManager configManager;
- /** Heartbeat sample cache */
- // Map<NodeId, IHeartbeatStatistic>
- private final Map<Integer, INodeCache> nodeCacheMap;
- // Map<RegionId, RegionGroupCache>
- private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
-
/** Balancers */
private final RegionBalancer regionBalancer;
@@ -103,26 +80,16 @@ public class LoadManager {
private final RouteBalancer routeBalancer;
private final LoadManagerMetrics loadManagerMetrics;
- /** Heartbeat executor service */
- private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
-
- private Future<?> currentHeartbeatFuture;
- private final ScheduledExecutorService heartBeatExecutor =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
-
/** Load balancing executor service */
private Future<?> currentLoadBalancingFuture;
private final ScheduledExecutorService loadBalancingExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
-
- /** Monitor for leadership change */
+ // Monitor for leadership change
private final Object scheduleMonitor = new Object();
public LoadManager(IManager configManager) {
this.configManager = configManager;
- this.nodeCacheMap = new ConcurrentHashMap<>();
- this.regionGroupCacheMap = new ConcurrentHashMap<>();
this.regionBalancer = new RegionBalancer(configManager);
this.partitionBalancer = new PartitionBalancer(configManager);
@@ -193,77 +160,16 @@ public class LoadManager {
return routeBalancer.genLatestRegionRouteMap(getPartitionManager().getAllReplicaSets());
}
- /**
- * Get the loadScore of each DataNode
- *
- * @return Map<DataNodeId, loadScore>
- */
- public Map<Integer, Long> getAllLoadScores() {
- Map<Integer, Long> result = new ConcurrentHashMap<>();
-
- nodeCacheMap.forEach(
- (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
-
- return result;
- }
-
- /**
- * Get the leadership of each RegionGroup
- *
- * @return Map<RegionGroupId, leader location>
- */
- public Map<TConsensusGroupId, Integer> getAllLeadership() {
- Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
- if (ConfigNodeDescriptor.getInstance()
- .getConf()
- .getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.MultiLeaderConsensus)) {
- regionGroupCacheMap.forEach(
- (consensusGroupId, regionGroupCache) -> {
- if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) {
- result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId());
- }
- });
- routeBalancer
- .getRouteMap()
- .forEach(
- (consensusGroupId, regionReplicaSet) -> {
- result.put(
- consensusGroupId,
- regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId());
- });
- } else {
- regionGroupCacheMap.forEach(
- (consensusGroupId, regionGroupCache) ->
- result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId()));
- }
- return result;
- }
-
- /** Start the heartbeat service and the load balancing service */
- public void start() {
- LOGGER.debug("Start Heartbeat Service of LoadManager");
+ /** Start the load balancing service */
+ public void startLoadBalancingService() {
synchronized (scheduleMonitor) {
- /* Start the heartbeat service */
- if (currentHeartbeatFuture == null) {
- currentHeartbeatFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- heartBeatExecutor,
- this::heartbeatLoopBody,
- 0,
- HEARTBEAT_INTERVAL,
- TimeUnit.MILLISECONDS);
- LOGGER.info("Heartbeat service is started successfully.");
- }
-
- /* Start the load balancing service */
if (currentLoadBalancingFuture == null) {
currentLoadBalancingFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
loadBalancingExecutor,
this::updateNodeLoadStatistic,
0,
- HEARTBEAT_INTERVAL,
+ NodeManager.HEARTBEAT_INTERVAL,
TimeUnit.MILLISECONDS);
LOGGER.info("LoadBalancing service is started successfully.");
}
@@ -271,15 +177,11 @@ public class LoadManager {
}
}
- /** Stop the heartbeat service and the load balancing service */
- public void stop() {
+ /** Stop the load balancing service */
+ public void stopLoadBalancingService() {
loadManagerMetrics.removeMetrics();
- LOGGER.debug("Stop Heartbeat Service and LoadBalancing Service of LoadManager");
synchronized (scheduleMonitor) {
- if (currentHeartbeatFuture != null) {
- currentHeartbeatFuture.cancel(false);
- currentHeartbeatFuture = null;
- LOGGER.info("Heartbeat service is stopped successfully.");
+ if (currentLoadBalancingFuture != null) {
currentLoadBalancingFuture.cancel(false);
currentLoadBalancingFuture = null;
LOGGER.info("LoadBalancing service is stopped successfully.");
@@ -293,7 +195,8 @@ public class LoadManager {
AtomicBoolean existChangeLeaderDataRegionGroup = new AtomicBoolean(false);
boolean isNeedBroadcast = false;
- nodeCacheMap
+ getNodeManager()
+ .getNodeCacheMap()
.values()
.forEach(
nodeCache -> {
@@ -304,7 +207,8 @@ public class LoadManager {
}
});
- regionGroupCacheMap
+ getPartitionManager()
+ .getRegionGroupCacheMap()
.values()
.forEach(
regionGroupCache -> {
@@ -347,7 +251,8 @@ public class LoadManager {
public void broadcastLatestRegionRouteMap() {
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- getOnlineDataNodes()
+ getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running)
.forEach(
onlineDataNode ->
dataNodeLocationMap.put(
@@ -365,138 +270,6 @@ public class LoadManager {
LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap finished.");
}
- /** loop body of the heartbeat thread */
- private void heartbeatLoopBody() {
- if (getConsensusManager().isLeader()) {
- // Generate HeartbeatReq
- THeartbeatReq heartbeatReq = genHeartbeatReq();
- // Send heartbeat requests to all the registered DataNodes
- pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
- // Send heartbeat requests to all the registered ConfigNodes
- pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes());
- }
- }
-
- private THeartbeatReq genHeartbeatReq() {
- /* Generate heartbeat request */
- THeartbeatReq heartbeatReq = new THeartbeatReq();
- heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
- // We update RegionGroups' leadership in every 5 heartbeat loop
- heartbeatReq.setNeedJudgeLeader(heartbeatCounter.get() % 5 == 0);
- // We sample DataNode's load in every 10 heartbeat loop
- heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
-
- /* Update heartbeat counter */
- heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
- return heartbeatReq;
- }
-
- /**
- * Send heartbeat requests to all the Registered DataNodes
- *
- * @param registeredDataNodes DataNodes that registered in cluster
- */
- private void pingRegisteredDataNodes(
- THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
- // Send heartbeat requests
- for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
- DataNodeHeartbeatHandler handler =
- new DataNodeHeartbeatHandler(
- dataNodeInfo.getLocation(),
- (DataNodeHeartbeatCache)
- nodeCacheMap.computeIfAbsent(
- dataNodeInfo.getLocation().getDataNodeId(),
- empty -> new DataNodeHeartbeatCache()),
- regionGroupCacheMap);
- AsyncDataNodeHeartbeatClientPool.getInstance()
- .getDataNodeHeartBeat(
- dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
- }
- }
-
- /**
- * Send heartbeat requests to all the Registered ConfigNodes
- *
- * @param registeredConfigNodes ConfigNodes that registered in cluster
- */
- private void pingRegisteredConfigNodes(
- THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
- // Send heartbeat requests
- for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
- if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
- // Skip itself
- nodeCacheMap.putIfAbsent(
- configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
- continue;
- }
-
- ConfigNodeHeartbeatHandler handler =
- new ConfigNodeHeartbeatHandler(
- (ConfigNodeHeartbeatCache)
- nodeCacheMap.computeIfAbsent(
- configNodeLocation.getConfigNodeId(),
- empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
- AsyncConfigNodeHeartbeatClientPool.getInstance()
- .getConfigNodeHeartBeat(
- configNodeLocation.getInternalEndPoint(),
- heartbeatReq.getHeartbeatTimestamp(),
- handler);
- }
- }
-
- /**
- * When a node is removed, clear the node's cache
- *
- * @param nodeId removed node id
- */
- public void removeNodeHeartbeatHandCache(Integer nodeId) {
- nodeCacheMap.remove(nodeId);
- }
-
- public List<TConfigNodeLocation> getOnlineConfigNodes() {
- return getNodeManager().getRegisteredConfigNodes().stream()
- .filter(
- registeredConfigNode -> {
- int configNodeId = registeredConfigNode.getConfigNodeId();
- return nodeCacheMap.containsKey(configNodeId)
- && NodeStatus.Running.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
- })
- .collect(Collectors.toList());
- }
-
- public List<TDataNodeConfiguration> getOnlineDataNodes() {
- return getNodeManager().getRegisteredDataNodes().stream()
- .filter(
- registeredDataNode -> {
- int id = registeredDataNode.getLocation().getDataNodeId();
- return nodeCacheMap.containsKey(id)
- && NodeStatus.Running.equals(nodeCacheMap.get(id).getNodeStatus());
- })
- .collect(Collectors.toList());
- }
-
- public List<TConfigNodeLocation> getUnknownConfigNodes() {
- return getNodeManager().getRegisteredConfigNodes().stream()
- .filter(
- registeredConfigNode -> {
- int configNodeId = registeredConfigNode.getConfigNodeId();
- return nodeCacheMap.containsKey(configNodeId)
- && NodeStatus.Unknown.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
- })
- .collect(Collectors.toList());
- }
-
- public List<TDataNodeConfiguration> getUnknownDataNodes() {
- return getNodeManager().getRegisteredDataNodes().stream()
- .filter(
- registeredDataNode -> {
- int id = registeredDataNode.getLocation().getDataNodeId();
- return nodeCacheMap.containsKey(id)
- && NodeStatus.Unknown.equals(nodeCacheMap.get(id).getNodeStatus());
- })
- .collect(Collectors.toList());
- }
-
public static void printRegionRouteMap(
long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> regionRouteMap) {
LOGGER.info("[latestRegionRouteMap] timestamp:{}", timestamp);
@@ -511,6 +284,10 @@ public class LoadManager {
}
}
+ public RouteBalancer getRouteBalancer() {
+ return routeBalancer;
+ }
+
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
@@ -526,8 +303,4 @@ public class LoadManager {
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
-
- public Map<Integer, INodeCache> getNodeCacheMap() {
- return nodeCacheMap;
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
index 97df64204a..4611230c5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.NodeManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -50,12 +51,12 @@ public class LoadManagerMetrics {
}
private int getRunningConfigNodesNum() {
- List<TConfigNodeLocation> allConfigNodes =
- configManager.getLoadManager().getOnlineConfigNodes();
- if (allConfigNodes == null) {
+ List<TConfigNodeLocation> runningConfigNodes =
+ getNodeManager().filterConfigNodeThroughStatus(NodeStatus.Running);
+ if (runningConfigNodes == null) {
return 0;
}
- for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
+ for (TConfigNodeLocation configNodeLocation : runningConfigNodes) {
String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint());
MetricsService.getInstance()
@@ -69,15 +70,16 @@ public class LoadManagerMetrics {
"ConfigNode")
.set(1);
}
- return allConfigNodes.size();
+ return runningConfigNodes.size();
}
private int getRunningDataNodesNum() {
- List<TDataNodeConfiguration> allDataNodes = configManager.getLoadManager().getOnlineDataNodes();
- if (allDataNodes == null) {
+ List<TDataNodeConfiguration> runningDataNodes =
+ getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
+ if (runningDataNodes == null) {
return 0;
}
- for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
+ for (TDataNodeConfiguration dataNodeInfo : runningDataNodes) {
TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint());
@@ -92,16 +94,16 @@ public class LoadManagerMetrics {
"DataNode")
.set(1);
}
- return allDataNodes.size();
+ return runningDataNodes.size();
}
private int getUnknownConfigNodesNum() {
- List<TConfigNodeLocation> allConfigNodes =
- configManager.getLoadManager().getUnknownConfigNodes();
- if (allConfigNodes == null) {
+ List<TConfigNodeLocation> unknownConfigNodes =
+ getNodeManager().filterConfigNodeThroughStatus(NodeStatus.Unknown);
+ if (unknownConfigNodes == null) {
return 0;
}
- for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
+ for (TConfigNodeLocation configNodeLocation : unknownConfigNodes) {
String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint());
MetricsService.getInstance()
@@ -115,16 +117,16 @@ public class LoadManagerMetrics {
"ConfigNode")
.set(0);
}
- return allConfigNodes.size();
+ return unknownConfigNodes.size();
}
private int getUnknownDataNodesNum() {
- List<TDataNodeConfiguration> allDataNodes =
- configManager.getLoadManager().getUnknownDataNodes();
- if (allDataNodes == null) {
+ List<TDataNodeConfiguration> unknownDataNodes =
+ getNodeManager().filterDataNodeThroughStatus(NodeStatus.Unknown);
+ if (unknownDataNodes == null) {
return 0;
}
- for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
+ for (TDataNodeConfiguration dataNodeInfo : unknownDataNodes) {
TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint());
@@ -139,7 +141,7 @@ public class LoadManagerMetrics {
"DataNode")
.set(0);
}
- return allDataNodes.size();
+ return unknownDataNodes.size();
}
public void addNodeMetrics() {
@@ -202,8 +204,7 @@ public class LoadManagerMetrics {
public Integer getLeadershipCountByDatanode(int dataNodeId) {
Map<Integer, Integer> idToCountMap = new ConcurrentHashMap<>();
- configManager
- .getLoadManager()
+ getPartitionManager()
.getAllLeadership()
.forEach((consensusGroupId, nodeId) -> idToCountMap.merge(nodeId, 1, Integer::sum));
return idToCountMap.get(dataNodeId);
@@ -269,4 +270,8 @@ public class LoadManagerMetrics {
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
+
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 8c92c298fb..da94fce128 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -29,8 +30,8 @@ import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.NodeManager;
import org.apache.iotdb.confignode.manager.PartitionManager;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator;
@@ -68,7 +69,8 @@ public class RegionBalancer {
CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
IRegionAllocator regionAllocator = genRegionAllocator();
- List<TDataNodeConfiguration> onlineDataNodes = getLoadManager().getOnlineDataNodes();
+ List<TDataNodeConfiguration> onlineDataNodes =
+ getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
@@ -120,8 +122,8 @@ public class RegionBalancer {
}
}
- private LoadManager getLoadManager() {
- return configManager.getLoadManager();
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
}
private ClusterSchemaManager getClusterSchemaManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index af30d14378..1ef562245b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.NodeManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
import org.apache.iotdb.confignode.manager.load.balancer.router.LazyGreedyRouter;
import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
@@ -84,9 +86,9 @@ public class RouteBalancer {
case SchemaRegion:
if (policy.equals(leaderPolicy)) {
return new LeaderRouter(
- getLoadManager().getAllLeadership(), getLoadManager().getAllLoadScores());
+ getPartitionManager().getAllLeadership(), getNodeManager().getAllLoadScores());
} else {
- return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+ return new LoadScoreGreedyRouter(getNodeManager().getAllLoadScores());
}
case DataRegion:
default:
@@ -95,19 +97,24 @@ public class RouteBalancer {
.getDataRegionConsensusProtocolClass()
.equals(ConsensusFactory.MultiLeaderConsensus)) {
// Latent router for MultiLeader consensus protocol
- lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes());
+ lazyGreedyRouter.updateUnknownDataNodes(
+ getNodeManager().filterDataNodeThroughStatus(NodeStatus.Unknown));
return lazyGreedyRouter;
} else if (policy.equals(leaderPolicy)) {
return new LeaderRouter(
- getLoadManager().getAllLeadership(), getLoadManager().getAllLoadScores());
+ getPartitionManager().getAllLeadership(), getNodeManager().getAllLoadScores());
} else {
- return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+ return new LoadScoreGreedyRouter(getNodeManager().getAllLoadScores());
}
}
}
- private LoadManager getLoadManager() {
- return configManager.getLoadManager();
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
}
public Map<TConsensusGroupId, TRegionReplicaSet> getRouteMap() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index a354c9aa24..53a5dff62d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.manager.load.heartbeat;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.NodeManager;
import java.util.LinkedList;
@@ -59,7 +59,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
@Override
public boolean updateLoadStatistic() {
- if (configNodeLocation.getInternalEndPoint().equals(LoadManager.CURRENT_NODE)) {
+ if (configNodeLocation.getInternalEndPoint().equals(NodeManager.CURRENT_NODE)) {
this.status = NodeStatus.Running;
return false;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 7a5af9d818..9daa2d4ea7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -203,9 +203,6 @@ public class ConfigNodeProcedureEnv {
try {
// Execute removePeer
if (getConsensusManager().removeConfigNodePeer(tConfigNodeLocation)) {
- configManager
- .getLoadManager()
- .removeNodeHeartbeatHandCache(tConfigNodeLocation.getConfigNodeId());
tsStatus =
getConsensusManager().write(new RemoveConfigNodePlan(tConfigNodeLocation)).getStatus();
} else {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index ca4d63156b..dfa48014c0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
@@ -88,7 +89,7 @@ public class DataNodeRemoveHandler {
"DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TEndPoint> otherOnlineDataNodes =
- configManager.getLoadManager().getOnlineDataNodes().stream()
+ configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.map(TDataNodeConfiguration::getLocation)
.filter(loc -> !loc.equals(disabledDataNode))
.map(TDataNodeLocation::getInternalEndPoint)
@@ -293,7 +294,7 @@ public class DataNodeRemoveHandler {
private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
List<TDataNodeLocation> regionReplicaNodes) {
- return configManager.getLoadManager().getOnlineDataNodes().stream()
+ return configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.map(TDataNodeConfiguration::getLocation)
.filter(e -> !regionReplicaNodes.contains(e))
.findAny();