You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/12 15:43:19 UTC

[iotdb] branch master updated: [IOTDB-4112] Decoupling heartbeat scheduled executor service from LoadManager (#6979)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5516fe17ff [IOTDB-4112] Decoupling heartbeat scheduled executor service from LoadManager (#6979)
5516fe17ff is described below

commit 5516fe17ffdd0831a7d1caf7a1a4659ceaaa6bad
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Fri Aug 12 23:43:12 2022 +0800

    [IOTDB-4112] Decoupling heartbeat scheduled executor service from LoadManager (#6979)
---
 .../statemachine/PartitionRegionStateMachine.java  |   8 +-
 .../iotdb/confignode/manager/ConfigManager.java    |   5 +-
 .../iotdb/confignode/manager/NodeManager.java      | 229 ++++++++++++++++--
 .../iotdb/confignode/manager/PartitionManager.java |  52 +++-
 .../iotdb/confignode/manager/load/LoadManager.java | 261 ++-------------------
 .../manager/load/LoadManagerMetrics.java           |  47 ++--
 .../manager/load/balancer/RegionBalancer.java      |  10 +-
 .../manager/load/balancer/RouteBalancer.java       |  23 +-
 .../load/heartbeat/ConfigNodeHeartbeatCache.java   |   4 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   3 -
 .../procedure/env/DataNodeRemoveHandler.java       |   5 +-
 11 files changed, 330 insertions(+), 317 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 4259e5df71..7af80b66f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 
-/** Statemachine for PartitionRegion */
+/** StateMachine for PartitionRegion */
 public class PartitionRegionStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
@@ -141,13 +141,15 @@ public class PartitionRegionStateMachine implements IStateMachine, IStateMachine
     if (currentNode.equals(newLeader)) {
       LOGGER.info("Current node {} becomes Leader", newLeader);
       configManager.getProcedureManager().shiftExecutor(true);
-      configManager.getLoadManager().start();
+      configManager.getLoadManager().startLoadBalancingService();
+      configManager.getNodeManager().startHeartbeatService();
       configManager.getPartitionManager().startRegionCleaner();
     } else {
       LOGGER.info(
           "Current node {} is not longer the leader, the new leader is {}", currentNode, newLeader);
       configManager.getProcedureManager().shiftExecutor(false);
-      configManager.getLoadManager().stop();
+      configManager.getLoadManager().stopLoadBalancingService();
+      configManager.getNodeManager().stopHeartbeatService();
       configManager.getPartitionManager().stopRegionCleaner();
     }
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a7a53a8f6a..b5b93d5267 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -245,7 +245,7 @@ public class ConfigManager implements IManager {
               .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
               .collect(Collectors.toList());
       Map<Integer, String> nodeStatus = new HashMap<>();
-      getLoadManager()
+      getNodeManager()
           .getNodeCacheMap()
           .forEach(
               (nodeId, heartbeatCache) ->
@@ -858,7 +858,8 @@ public class ConfigManager implements IManager {
           .getRegionInfoList()
           .forEach(
               regionInfo -> {
-                Map<TConsensusGroupId, Integer> allLeadership = loadManager.getAllLeadership();
+                Map<TConsensusGroupId, Integer> allLeadership =
+                    getPartitionManager().getAllLeadership();
                 if (!allLeadership.isEmpty()) {
                   String regionType =
                       regionInfo.getDataNodeId()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 881f0b29d4..f188ae7148 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -21,11 +21,19 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
@@ -37,6 +45,8 @@ import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
 import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
 import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
@@ -46,6 +56,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -57,22 +68,44 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 /** NodeManager manages cluster node addition and removal requests */
 public class NodeManager {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
 
+  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+  public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
+
+  public static final TEndPoint CURRENT_NODE =
+      new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
+
   private final IManager configManager;
   private final NodeInfo nodeInfo;
 
   private final ReentrantLock removeConfigNodeLock;
 
+  /** Heartbeat executor service */
+  // Monitor for leadership change
+  private final Object scheduleMonitor = new Object();
+  // Map<NodeId, INodeCache>
+  private final Map<Integer, INodeCache> nodeCacheMap;
+  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+  private Future<?> currentHeartbeatFuture;
+  private final ScheduledExecutorService heartBeatExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
+
   public NodeManager(IManager configManager, NodeInfo nodeInfo) {
     this.configManager = configManager;
     this.nodeInfo = nodeInfo;
     this.removeConfigNodeLock = new ReentrantLock();
+    this.nodeCacheMap = new ConcurrentHashMap<>();
   }
 
   private void setGlobalConfig(DataNodeRegisterResp dataSet) {
@@ -181,15 +214,6 @@ public class NodeManager {
     return nodeInfo.getRegisteredDataNodeCount();
   }
 
-  /**
-   * Only leader use this interface
-   *
-   * @return The number of registered TotalNodes
-   */
-  public int getRegisteredNodeCount() {
-    return nodeInfo.getRegisteredNodeCount();
-  }
-
   /**
    * Only leader use this interface
    *
@@ -220,15 +244,6 @@ public class NodeManager {
     return dataNodeLocations;
   }
 
-  private INodeCache getNodeCache(int nodeId) {
-    return getLoadManager().getNodeCacheMap().get(nodeId);
-  }
-
-  private String getNodeStatus(int nodeId) {
-    INodeCache nodeCache = getNodeCache(nodeId);
-    return nodeCache == null ? "Unknown" : nodeCache.getNodeStatus().getStatus();
-  }
-
   public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
     List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
     List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
@@ -293,7 +308,7 @@ public class NodeManager {
     removeConfigNodeLock.tryLock();
     try {
       // Check OnlineConfigNodes number
-      if (getLoadManager().getOnlineConfigNodes().size() <= 1) {
+      if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
         return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
             .setMessage(
                 "Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
@@ -331,7 +346,7 @@ public class NodeManager {
   private TSStatus transferLeader(
       RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
     TConfigNodeLocation newLeader =
-        getLoadManager().getOnlineConfigNodes().stream()
+        filterConfigNodeThroughStatus(NodeStatus.Running).stream()
             .filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
             .findAny()
             .get();
@@ -384,6 +399,176 @@ public class NodeManager {
     return dataNodeResponseStatus;
   }
 
+  /** Start the heartbeat service */
+  public void startHeartbeatService() {
+    synchronized (scheduleMonitor) {
+      if (currentHeartbeatFuture == null) {
+        currentHeartbeatFuture =
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                heartBeatExecutor,
+                this::heartbeatLoopBody,
+                0,
+                HEARTBEAT_INTERVAL,
+                TimeUnit.MILLISECONDS);
+        LOGGER.info("Heartbeat service is started successfully.");
+      }
+    }
+  }
+
+  /** loop body of the heartbeat thread */
+  private void heartbeatLoopBody() {
+    if (getConsensusManager().isLeader()) {
+      // Generate HeartbeatReq
+      THeartbeatReq heartbeatReq = genHeartbeatReq();
+      // Send heartbeat requests to all the registered DataNodes
+      pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
+      // Send heartbeat requests to all the registered ConfigNodes
+      pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
+    }
+  }
+
+  private THeartbeatReq genHeartbeatReq() {
+    /* Generate heartbeat request */
+    THeartbeatReq heartbeatReq = new THeartbeatReq();
+    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+    // We update RegionGroups' leadership in every 5 heartbeat loop
+    heartbeatReq.setNeedJudgeLeader(heartbeatCounter.get() % 5 == 0);
+    // We sample DataNode's load in every 10 heartbeat loop
+    heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+
+    /* Update heartbeat counter */
+    heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+    return heartbeatReq;
+  }
+
+  /**
+   * Send heartbeat requests to all the Registered DataNodes
+   *
+   * @param registeredDataNodes DataNodes that registered in cluster
+   */
+  private void pingRegisteredDataNodes(
+      THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
+    // Send heartbeat requests
+    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
+      DataNodeHeartbeatHandler handler =
+          new DataNodeHeartbeatHandler(
+              dataNodeInfo.getLocation(),
+              (DataNodeHeartbeatCache)
+                  nodeCacheMap.computeIfAbsent(
+                      dataNodeInfo.getLocation().getDataNodeId(),
+                      empty -> new DataNodeHeartbeatCache()),
+              getPartitionManager().getRegionGroupCacheMap());
+      AsyncDataNodeHeartbeatClientPool.getInstance()
+          .getDataNodeHeartBeat(
+              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+    }
+  }
+
+  /**
+   * Send heartbeat requests to all the Registered ConfigNodes
+   *
+   * @param registeredConfigNodes ConfigNodes that registered in cluster
+   */
+  private void pingRegisteredConfigNodes(
+      THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
+    // Send heartbeat requests
+    for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+      if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
+        // Skip itself
+        nodeCacheMap.putIfAbsent(
+            configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
+        continue;
+      }
+
+      ConfigNodeHeartbeatHandler handler =
+          new ConfigNodeHeartbeatHandler(
+              (ConfigNodeHeartbeatCache)
+                  nodeCacheMap.computeIfAbsent(
+                      configNodeLocation.getConfigNodeId(),
+                      empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
+      AsyncConfigNodeHeartbeatClientPool.getInstance()
+          .getConfigNodeHeartBeat(
+              configNodeLocation.getInternalEndPoint(),
+              heartbeatReq.getHeartbeatTimestamp(),
+              handler);
+    }
+  }
+
+  /** Stop the heartbeat service */
+  public void stopHeartbeatService() {
+    synchronized (scheduleMonitor) {
+      if (currentHeartbeatFuture != null) {
+        currentHeartbeatFuture.cancel(false);
+        currentHeartbeatFuture = null;
+        nodeCacheMap.clear();
+        LOGGER.info("Heartbeat service is stopped successfully.");
+      }
+    }
+  }
+
+  public Map<Integer, INodeCache> getNodeCacheMap() {
+    return nodeCacheMap;
+  }
+
+  /**
+   * Safely get the specific Node's current status
+   *
+   * @param nodeId The specific Node's index
+   * @return The specific Node's current status if the nodeCache contains it, Unknown otherwise
+   */
+  private String getNodeStatus(int nodeId) {
+    INodeCache nodeCache = nodeCacheMap.get(nodeId);
+    return nodeCache == null ? "Unknown" : nodeCache.getNodeStatus().getStatus();
+  }
+
+  /**
+   * Filter the registered ConfigNodes through the specific NodeStatus
+   *
+   * @param status The specific NodeStatus
+   * @return Filtered ConfigNodes with the specific NodeStatus
+   */
+  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus status) {
+    return getRegisteredConfigNodes().stream()
+        .filter(
+            registeredConfigNode -> {
+              int configNodeId = registeredConfigNode.getConfigNodeId();
+              return nodeCacheMap.containsKey(configNodeId)
+                  && status.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
+            })
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Filter the registered DataNodes through the specific NodeStatus
+   *
+   * @param status The specific NodeStatus
+   * @return Filtered DataNodes with the specific NodeStatus
+   */
+  public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus status) {
+    return getRegisteredDataNodes().stream()
+        .filter(
+            registeredDataNode -> {
+              int id = registeredDataNode.getLocation().getDataNodeId();
+              return nodeCacheMap.containsKey(id)
+                  && status.equals(nodeCacheMap.get(id).getNodeStatus());
+            })
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the loadScore of each DataNode
+   *
+   * @return Map<DataNodeId, loadScore>
+   */
+  public Map<Integer, Long> getAllLoadScores() {
+    Map<Integer, Long> result = new ConcurrentHashMap<>();
+
+    nodeCacheMap.forEach(
+        (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
+
+    return result;
+  }
+
   public List<TConfigNodeLocation> getRegisteredConfigNodes() {
     return nodeInfo.getRegisteredConfigNodes();
   }
@@ -396,7 +581,7 @@ public class NodeManager {
     return configManager.getClusterSchemaManager();
   }
 
-  private LoadManager getLoadManager() {
-    return configManager.getLoadManager();
+  private PartitionManager getPartitionManager() {
+    return configManager.getPartitionManager();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 5aeb9885e3..c66e9a72ae 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -49,7 +49,9 @@ import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -86,15 +88,19 @@ public class PartitionManager {
   private final ScheduledExecutorService regionCleaner;
   private Future<?> currentRegionCleanerFuture;
 
+  // Map<RegionId, RegionGroupCache>
+  private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
+
   public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
     this.configManager = configManager;
     this.partitionInfo = partitionInfo;
     this.regionCleaner =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Cleaner");
+    this.regionGroupCacheMap = new ConcurrentHashMap<>();
     setSeriesPartitionExecutor();
   }
 
-  /** Construct SeriesPartitionExecutor by iotdb-confignode.propertis */
+  /** Construct SeriesPartitionExecutor by iotdb-confignode.properties */
   private void setSeriesPartitionExecutor() {
     ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
     this.executor =
@@ -407,7 +413,7 @@ public class PartitionManager {
    *
    * @param physicalPlan GetNodesPathsPartitionReq
    * @return SchemaNodeManagementPartitionDataSet that contains only existing matched
-   *     SchemaPartition and matched child paths aboveMtree
+   *     SchemaPartition and matched child paths aboveMTree
    */
   public DataSet getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan) {
     SchemaNodeManagementResp schemaNodeManagementResp;
@@ -501,11 +507,49 @@ public class PartitionManager {
         /* Stop the RegionCleaner service */
         currentRegionCleanerFuture.cancel(false);
         currentRegionCleanerFuture = null;
+        regionGroupCacheMap.clear();
         LOGGER.info("RegionCleaner is stopped successfully.");
       }
     }
   }
 
+  public Map<TConsensusGroupId, IRegionGroupCache> getRegionGroupCacheMap() {
+    return regionGroupCacheMap;
+  }
+
+  /**
+   * Get the leadership of each RegionGroup
+   *
+   * @return Map<RegionGroupId, leader location>
+   */
+  public Map<TConsensusGroupId, Integer> getAllLeadership() {
+    Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
+    if (ConfigNodeDescriptor.getInstance()
+        .getConf()
+        .getDataRegionConsensusProtocolClass()
+        .equals(ConsensusFactory.MultiLeaderConsensus)) {
+      regionGroupCacheMap.forEach(
+          (consensusGroupId, regionGroupCache) -> {
+            if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) {
+              result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId());
+            }
+          });
+      getLoadManager()
+          .getRouteBalancer()
+          .getRouteMap()
+          .forEach(
+              (consensusGroupId, regionReplicaSet) ->
+                  result.put(
+                      consensusGroupId,
+                      regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId()));
+    } else {
+      regionGroupCacheMap.forEach(
+          (consensusGroupId, regionGroupCache) ->
+              result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId()));
+    }
+    return result;
+  }
+
   public ScheduledExecutorService getRegionCleaner() {
     return regionCleaner;
   }
@@ -514,10 +558,6 @@ public class PartitionManager {
     return configManager.getConsensusManager();
   }
 
-  private NodeManager getNodeManager() {
-    return configManager.getNodeManager();
-  }
-
   private ClusterSchemaManager getClusterSchemaManager() {
     return configManager.getClusterSchemaManager();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 800ce1628d..abebafa3c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -18,12 +18,9 @@
  */
 package org.apache.iotdb.confignode.manager.load;
 
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -33,11 +30,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeHeartbeatClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -51,12 +44,8 @@ import org.apache.iotdb.confignode.manager.PartitionManager;
 import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
 import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 
 import org.slf4j.Logger;
@@ -70,7 +59,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -83,19 +71,8 @@ public class LoadManager {
 
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
 
-  private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
-
-  public static final TEndPoint CURRENT_NODE =
-      new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
-
   private final IManager configManager;
 
-  /** Heartbeat sample cache */
-  // Map<NodeId, IHeartbeatStatistic>
-  private final Map<Integer, INodeCache> nodeCacheMap;
-  // Map<RegionId, RegionGroupCache>
-  private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
-
   /** Balancers */
   private final RegionBalancer regionBalancer;
 
@@ -103,26 +80,16 @@ public class LoadManager {
   private final RouteBalancer routeBalancer;
   private final LoadManagerMetrics loadManagerMetrics;
 
-  /** Heartbeat executor service */
-  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
-
-  private Future<?> currentHeartbeatFuture;
-  private final ScheduledExecutorService heartBeatExecutor =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
-
   /** Load balancing executor service */
   private Future<?> currentLoadBalancingFuture;
 
   private final ScheduledExecutorService loadBalancingExecutor =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
-
-  /** Monitor for leadership change */
+  // Monitor for leadership change
   private final Object scheduleMonitor = new Object();
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
-    this.nodeCacheMap = new ConcurrentHashMap<>();
-    this.regionGroupCacheMap = new ConcurrentHashMap<>();
 
     this.regionBalancer = new RegionBalancer(configManager);
     this.partitionBalancer = new PartitionBalancer(configManager);
@@ -193,77 +160,16 @@ public class LoadManager {
     return routeBalancer.genLatestRegionRouteMap(getPartitionManager().getAllReplicaSets());
   }
 
-  /**
-   * Get the loadScore of each DataNode
-   *
-   * @return Map<DataNodeId, loadScore>
-   */
-  public Map<Integer, Long> getAllLoadScores() {
-    Map<Integer, Long> result = new ConcurrentHashMap<>();
-
-    nodeCacheMap.forEach(
-        (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
-
-    return result;
-  }
-
-  /**
-   * Get the leadership of each RegionGroup
-   *
-   * @return Map<RegionGroupId, leader location>
-   */
-  public Map<TConsensusGroupId, Integer> getAllLeadership() {
-    Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
-    if (ConfigNodeDescriptor.getInstance()
-        .getConf()
-        .getDataRegionConsensusProtocolClass()
-        .equals(ConsensusFactory.MultiLeaderConsensus)) {
-      regionGroupCacheMap.forEach(
-          (consensusGroupId, regionGroupCache) -> {
-            if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) {
-              result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId());
-            }
-          });
-      routeBalancer
-          .getRouteMap()
-          .forEach(
-              (consensusGroupId, regionReplicaSet) -> {
-                result.put(
-                    consensusGroupId,
-                    regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId());
-              });
-    } else {
-      regionGroupCacheMap.forEach(
-          (consensusGroupId, regionGroupCache) ->
-              result.put(consensusGroupId, regionGroupCache.getLeaderDataNodeId()));
-    }
-    return result;
-  }
-
-  /** Start the heartbeat service and the load balancing service */
-  public void start() {
-    LOGGER.debug("Start Heartbeat Service of LoadManager");
+  /** Start the load balancing service */
+  public void startLoadBalancingService() {
     synchronized (scheduleMonitor) {
-      /* Start the heartbeat service */
-      if (currentHeartbeatFuture == null) {
-        currentHeartbeatFuture =
-            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                heartBeatExecutor,
-                this::heartbeatLoopBody,
-                0,
-                HEARTBEAT_INTERVAL,
-                TimeUnit.MILLISECONDS);
-        LOGGER.info("Heartbeat service is started successfully.");
-      }
-
-      /* Start the load balancing service */
       if (currentLoadBalancingFuture == null) {
         currentLoadBalancingFuture =
             ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
                 loadBalancingExecutor,
                 this::updateNodeLoadStatistic,
                 0,
-                HEARTBEAT_INTERVAL,
+                NodeManager.HEARTBEAT_INTERVAL,
                 TimeUnit.MILLISECONDS);
         LOGGER.info("LoadBalancing service is started successfully.");
       }
@@ -271,15 +177,11 @@ public class LoadManager {
     }
   }
 
-  /** Stop the heartbeat service and the load balancing service */
-  public void stop() {
+  /** Stop the load balancing service */
+  public void stopLoadBalancingService() {
     loadManagerMetrics.removeMetrics();
-    LOGGER.debug("Stop Heartbeat Service and LoadBalancing Service of LoadManager");
     synchronized (scheduleMonitor) {
-      if (currentHeartbeatFuture != null) {
-        currentHeartbeatFuture.cancel(false);
-        currentHeartbeatFuture = null;
-        LOGGER.info("Heartbeat service is stopped successfully.");
+      if (currentLoadBalancingFuture != null) {
         currentLoadBalancingFuture.cancel(false);
         currentLoadBalancingFuture = null;
         LOGGER.info("LoadBalancing service is stopped successfully.");
@@ -293,7 +195,8 @@ public class LoadManager {
     AtomicBoolean existChangeLeaderDataRegionGroup = new AtomicBoolean(false);
     boolean isNeedBroadcast = false;
 
-    nodeCacheMap
+    getNodeManager()
+        .getNodeCacheMap()
         .values()
         .forEach(
             nodeCache -> {
@@ -304,7 +207,8 @@ public class LoadManager {
               }
             });
 
-    regionGroupCacheMap
+    getPartitionManager()
+        .getRegionGroupCacheMap()
         .values()
         .forEach(
             regionGroupCache -> {
@@ -347,7 +251,8 @@ public class LoadManager {
   public void broadcastLatestRegionRouteMap() {
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
     Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    getOnlineDataNodes()
+    getNodeManager()
+        .filterDataNodeThroughStatus(NodeStatus.Running)
         .forEach(
             onlineDataNode ->
                 dataNodeLocationMap.put(
@@ -365,138 +270,6 @@ public class LoadManager {
     LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap finished.");
   }
 
-  /** loop body of the heartbeat thread */
-  private void heartbeatLoopBody() {
-    if (getConsensusManager().isLeader()) {
-      // Generate HeartbeatReq
-      THeartbeatReq heartbeatReq = genHeartbeatReq();
-      // Send heartbeat requests to all the registered DataNodes
-      pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
-      // Send heartbeat requests to all the registered ConfigNodes
-      pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes());
-    }
-  }
-
-  private THeartbeatReq genHeartbeatReq() {
-    /* Generate heartbeat request */
-    THeartbeatReq heartbeatReq = new THeartbeatReq();
-    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
-    // We update RegionGroups' leadership in every 5 heartbeat loop
-    heartbeatReq.setNeedJudgeLeader(heartbeatCounter.get() % 5 == 0);
-    // We sample DataNode's load in every 10 heartbeat loop
-    heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
-
-    /* Update heartbeat counter */
-    heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
-    return heartbeatReq;
-  }
-
-  /**
-   * Send heartbeat requests to all the Registered DataNodes
-   *
-   * @param registeredDataNodes DataNodes that registered in cluster
-   */
-  private void pingRegisteredDataNodes(
-      THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
-    // Send heartbeat requests
-    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
-      DataNodeHeartbeatHandler handler =
-          new DataNodeHeartbeatHandler(
-              dataNodeInfo.getLocation(),
-              (DataNodeHeartbeatCache)
-                  nodeCacheMap.computeIfAbsent(
-                      dataNodeInfo.getLocation().getDataNodeId(),
-                      empty -> new DataNodeHeartbeatCache()),
-              regionGroupCacheMap);
-      AsyncDataNodeHeartbeatClientPool.getInstance()
-          .getDataNodeHeartBeat(
-              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
-    }
-  }
-
-  /**
-   * Send heartbeat requests to all the Registered ConfigNodes
-   *
-   * @param registeredConfigNodes ConfigNodes that registered in cluster
-   */
-  private void pingRegisteredConfigNodes(
-      THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
-    // Send heartbeat requests
-    for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
-      if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
-        // Skip itself
-        nodeCacheMap.putIfAbsent(
-            configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
-        continue;
-      }
-
-      ConfigNodeHeartbeatHandler handler =
-          new ConfigNodeHeartbeatHandler(
-              (ConfigNodeHeartbeatCache)
-                  nodeCacheMap.computeIfAbsent(
-                      configNodeLocation.getConfigNodeId(),
-                      empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
-      AsyncConfigNodeHeartbeatClientPool.getInstance()
-          .getConfigNodeHeartBeat(
-              configNodeLocation.getInternalEndPoint(),
-              heartbeatReq.getHeartbeatTimestamp(),
-              handler);
-    }
-  }
-
-  /**
-   * When a node is removed, clear the node's cache
-   *
-   * @param nodeId removed node id
-   */
-  public void removeNodeHeartbeatHandCache(Integer nodeId) {
-    nodeCacheMap.remove(nodeId);
-  }
-
-  public List<TConfigNodeLocation> getOnlineConfigNodes() {
-    return getNodeManager().getRegisteredConfigNodes().stream()
-        .filter(
-            registeredConfigNode -> {
-              int configNodeId = registeredConfigNode.getConfigNodeId();
-              return nodeCacheMap.containsKey(configNodeId)
-                  && NodeStatus.Running.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
-            })
-        .collect(Collectors.toList());
-  }
-
-  public List<TDataNodeConfiguration> getOnlineDataNodes() {
-    return getNodeManager().getRegisteredDataNodes().stream()
-        .filter(
-            registeredDataNode -> {
-              int id = registeredDataNode.getLocation().getDataNodeId();
-              return nodeCacheMap.containsKey(id)
-                  && NodeStatus.Running.equals(nodeCacheMap.get(id).getNodeStatus());
-            })
-        .collect(Collectors.toList());
-  }
-
-  public List<TConfigNodeLocation> getUnknownConfigNodes() {
-    return getNodeManager().getRegisteredConfigNodes().stream()
-        .filter(
-            registeredConfigNode -> {
-              int configNodeId = registeredConfigNode.getConfigNodeId();
-              return nodeCacheMap.containsKey(configNodeId)
-                  && NodeStatus.Unknown.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
-            })
-        .collect(Collectors.toList());
-  }
-
-  public List<TDataNodeConfiguration> getUnknownDataNodes() {
-    return getNodeManager().getRegisteredDataNodes().stream()
-        .filter(
-            registeredDataNode -> {
-              int id = registeredDataNode.getLocation().getDataNodeId();
-              return nodeCacheMap.containsKey(id)
-                  && NodeStatus.Unknown.equals(nodeCacheMap.get(id).getNodeStatus());
-            })
-        .collect(Collectors.toList());
-  }
-
   public static void printRegionRouteMap(
       long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> regionRouteMap) {
     LOGGER.info("[latestRegionRouteMap] timestamp:{}", timestamp);
@@ -511,6 +284,10 @@ public class LoadManager {
     }
   }
 
+  public RouteBalancer getRouteBalancer() {
+    return routeBalancer;
+  }
+
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
@@ -526,8 +303,4 @@ public class LoadManager {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
-
-  public Map<Integer, INodeCache> getNodeCacheMap() {
-    return nodeCacheMap;
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
index 97df64204a..4611230c5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.NodeManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -50,12 +51,12 @@ public class LoadManagerMetrics {
   }
 
   private int getRunningConfigNodesNum() {
-    List<TConfigNodeLocation> allConfigNodes =
-        configManager.getLoadManager().getOnlineConfigNodes();
-    if (allConfigNodes == null) {
+    List<TConfigNodeLocation> runningConfigNodes =
+        getNodeManager().filterConfigNodeThroughStatus(NodeStatus.Running);
+    if (runningConfigNodes == null) {
       return 0;
     }
-    for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
+    for (TConfigNodeLocation configNodeLocation : runningConfigNodes) {
       String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint());
 
       MetricsService.getInstance()
@@ -69,15 +70,16 @@ public class LoadManagerMetrics {
               "ConfigNode")
           .set(1);
     }
-    return allConfigNodes.size();
+    return runningConfigNodes.size();
   }
 
   private int getRunningDataNodesNum() {
-    List<TDataNodeConfiguration> allDataNodes = configManager.getLoadManager().getOnlineDataNodes();
-    if (allDataNodes == null) {
+    List<TDataNodeConfiguration> runningDataNodes =
+        getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
+    if (runningDataNodes == null) {
       return 0;
     }
-    for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
+    for (TDataNodeConfiguration dataNodeInfo : runningDataNodes) {
       TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
       String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint());
 
@@ -92,16 +94,16 @@ public class LoadManagerMetrics {
               "DataNode")
           .set(1);
     }
-    return allDataNodes.size();
+    return runningDataNodes.size();
   }
 
   private int getUnknownConfigNodesNum() {
-    List<TConfigNodeLocation> allConfigNodes =
-        configManager.getLoadManager().getUnknownConfigNodes();
-    if (allConfigNodes == null) {
+    List<TConfigNodeLocation> unknownConfigNodes =
+        getNodeManager().filterConfigNodeThroughStatus(NodeStatus.Unknown);
+    if (unknownConfigNodes == null) {
       return 0;
     }
-    for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
+    for (TConfigNodeLocation configNodeLocation : unknownConfigNodes) {
       String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint());
 
       MetricsService.getInstance()
@@ -115,16 +117,16 @@ public class LoadManagerMetrics {
               "ConfigNode")
           .set(0);
     }
-    return allConfigNodes.size();
+    return unknownConfigNodes.size();
   }
 
   private int getUnknownDataNodesNum() {
-    List<TDataNodeConfiguration> allDataNodes =
-        configManager.getLoadManager().getUnknownDataNodes();
-    if (allDataNodes == null) {
+    List<TDataNodeConfiguration> unknownDataNodes =
+        getNodeManager().filterDataNodeThroughStatus(NodeStatus.Unknown);
+    if (unknownDataNodes == null) {
       return 0;
     }
-    for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
+    for (TDataNodeConfiguration dataNodeInfo : unknownDataNodes) {
       TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
       String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint());
 
@@ -139,7 +141,7 @@ public class LoadManagerMetrics {
               "DataNode")
           .set(0);
     }
-    return allDataNodes.size();
+    return unknownDataNodes.size();
   }
 
   public void addNodeMetrics() {
@@ -202,8 +204,7 @@ public class LoadManagerMetrics {
   public Integer getLeadershipCountByDatanode(int dataNodeId) {
     Map<Integer, Integer> idToCountMap = new ConcurrentHashMap<>();
 
-    configManager
-        .getLoadManager()
+    getPartitionManager()
         .getAllLeadership()
         .forEach((consensusGroupId, nodeId) -> idToCountMap.merge(nodeId, 1, Integer::sum));
     return idToCountMap.get(dataNodeId);
@@ -269,4 +270,8 @@ public class LoadManagerMetrics {
   private NodeManager getNodeManager() {
     return configManager.getNodeManager();
   }
+
+  private PartitionManager getPartitionManager() {
+    return configManager.getPartitionManager();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 8c92c298fb..da94fce128 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
@@ -29,8 +30,8 @@ import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.NodeManager;
 import org.apache.iotdb.confignode.manager.PartitionManager;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator;
@@ -68,7 +69,8 @@ public class RegionBalancer {
     CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
     IRegionAllocator regionAllocator = genRegionAllocator();
 
-    List<TDataNodeConfiguration> onlineDataNodes = getLoadManager().getOnlineDataNodes();
+    List<TDataNodeConfiguration> onlineDataNodes =
+        getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
     List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
 
     for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
@@ -120,8 +122,8 @@ public class RegionBalancer {
     }
   }
 
-  private LoadManager getLoadManager() {
-    return configManager.getLoadManager();
+  private NodeManager getNodeManager() {
+    return configManager.getNodeManager();
   }
 
   private ClusterSchemaManager getClusterSchemaManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index af30d14378..1ef562245b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.manager.load.balancer;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.NodeManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
 import org.apache.iotdb.confignode.manager.load.balancer.router.LazyGreedyRouter;
 import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
@@ -84,9 +86,9 @@ public class RouteBalancer {
       case SchemaRegion:
         if (policy.equals(leaderPolicy)) {
           return new LeaderRouter(
-              getLoadManager().getAllLeadership(), getLoadManager().getAllLoadScores());
+              getPartitionManager().getAllLeadership(), getNodeManager().getAllLoadScores());
         } else {
-          return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+          return new LoadScoreGreedyRouter(getNodeManager().getAllLoadScores());
         }
       case DataRegion:
       default:
@@ -95,19 +97,24 @@ public class RouteBalancer {
             .getDataRegionConsensusProtocolClass()
             .equals(ConsensusFactory.MultiLeaderConsensus)) {
           // Latent router for MultiLeader consensus protocol
-          lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes());
+          lazyGreedyRouter.updateUnknownDataNodes(
+              getNodeManager().filterDataNodeThroughStatus(NodeStatus.Unknown));
           return lazyGreedyRouter;
         } else if (policy.equals(leaderPolicy)) {
           return new LeaderRouter(
-              getLoadManager().getAllLeadership(), getLoadManager().getAllLoadScores());
+              getPartitionManager().getAllLeadership(), getNodeManager().getAllLoadScores());
         } else {
-          return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+          return new LoadScoreGreedyRouter(getNodeManager().getAllLoadScores());
         }
     }
   }
 
-  private LoadManager getLoadManager() {
-    return configManager.getLoadManager();
+  private NodeManager getNodeManager() {
+    return configManager.getNodeManager();
+  }
+
+  private PartitionManager getPartitionManager() {
+    return configManager.getPartitionManager();
   }
 
   public Map<TConsensusGroupId, TRegionReplicaSet> getRouteMap() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index a354c9aa24..53a5dff62d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.manager.load.heartbeat;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.NodeManager;
 
 import java.util.LinkedList;
 
@@ -59,7 +59,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
 
   @Override
   public boolean updateLoadStatistic() {
-    if (configNodeLocation.getInternalEndPoint().equals(LoadManager.CURRENT_NODE)) {
+    if (configNodeLocation.getInternalEndPoint().equals(NodeManager.CURRENT_NODE)) {
       this.status = NodeStatus.Running;
       return false;
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 7a5af9d818..9daa2d4ea7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -203,9 +203,6 @@ public class ConfigNodeProcedureEnv {
     try {
       // Execute removePeer
       if (getConsensusManager().removeConfigNodePeer(tConfigNodeLocation)) {
-        configManager
-            .getLoadManager()
-            .removeNodeHeartbeatHandCache(tConfigNodeLocation.getConfigNodeId());
         tsStatus =
             getConsensusManager().write(new RemoveConfigNodePlan(tConfigNodeLocation)).getStatus();
       } else {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index ca4d63156b..dfa48014c0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
@@ -88,7 +89,7 @@ public class DataNodeRemoveHandler {
         "DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
     TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     List<TEndPoint> otherOnlineDataNodes =
-        configManager.getLoadManager().getOnlineDataNodes().stream()
+        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
             .map(TDataNodeConfiguration::getLocation)
             .filter(loc -> !loc.equals(disabledDataNode))
             .map(TDataNodeLocation::getInternalEndPoint)
@@ -293,7 +294,7 @@ public class DataNodeRemoveHandler {
 
   private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
       List<TDataNodeLocation> regionReplicaNodes) {
-    return configManager.getLoadManager().getOnlineDataNodes().stream()
+    return configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
         .map(TDataNodeConfiguration::getLocation)
         .filter(e -> !regionReplicaNodes.contains(e))
         .findAny();