You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2023/04/13 14:05:30 UTC

[iotdb] branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager created (now 94bed63697)

This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a change to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 94bed63697 ready for test

This branch includes the following new commits:

     new 58a2e633a6 stash for classes move
     new a9841c8d1a temporary relationship
     new 94bed63697 ready for test

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/03: stash for classes move

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 58a2e633a6bec7af2fe41c8622e61c4be74b9131
Author: YongzaoDan <53...@qq.com>
AuthorDate: Wed Apr 12 11:09:21 2023 +0800

    stash for classes move
---
 .../heartbeat/ConfigNodeHeartbeatHandler.java      |  18 +-
 .../heartbeat/DataNodeHeartbeatHandler.java        |  58 +++----
 .../statemachine/ConfigRegionStateMachine.java     |   4 +-
 .../iotdb/confignode/manager/ConfigManager.java    |   2 +-
 .../iotdb/confignode/manager/ProcedureManager.java |   2 +-
 .../confignode/manager/RetryFailedTasksThread.java |   4 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  42 +++--
 .../manager/load/balancer/RouteBalancer.java       |  30 +---
 .../load/heartbeat/HeartbeatSampleCache.java       | 123 +++++++++++++
 .../manager/load/heartbeat/HeartbeatService.java   | 190 +++++++++++++++++++++
 .../heartbeat/node}/BaseNodeCache.java             |   3 +-
 .../heartbeat/node}/ConfigNodeHeartbeatCache.java  |   3 +-
 .../heartbeat/node}/DataNodeHeartbeatCache.java    |   3 +-
 .../heartbeat/node}/NodeHeartbeatSample.java       |   2 +-
 .../heartbeat/region}/RegionCache.java             |   7 +-
 .../heartbeat/region}/RegionGroupCache.java        |   4 +-
 .../heartbeat/region}/RegionHeartbeatSample.java   |   2 +-
 .../statistics}/NodeStatistics.java                |   3 +-
 .../statistics}/RegionGroupStatistics.java         |   2 +-
 .../statistics}/RegionStatistics.java              |   3 +-
 .../statistics/StatisticsService.java}             |  30 +---
 .../iotdb/confignode/manager/node/NodeManager.java | 146 +---------------
 .../manager/observer/NodeStatisticsEvent.java      |   2 +-
 .../manager/partition/PartitionManager.java        |  10 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   6 +-
 .../procedure/env/DataNodeRemoveHandler.java       |   2 +-
 .../router/priority/GreedyPriorityTest.java        |   6 +-
 .../priority/LeaderPriorityBalancerTest.java       |   6 +-
 .../confignode/manager/node/NodeCacheTest.java     |   4 +-
 .../manager/partition/RegionGroupCacheTest.java    |   4 +-
 .../persistence/node/NodeStatisticsTest.java       |   2 +-
 .../statistics/RegionGroupStatisticsTest.java      |   4 +-
 .../partition/statistics/RegionStatisticsTest.java |   2 +-
 33 files changed, 427 insertions(+), 302 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index c97f3d8bb4..92881210e7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -18,24 +18,26 @@
  */
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 
 public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
 
-  // Update ConfigNodeHeartbeatCache when success
-  private final ConfigNodeHeartbeatCache configNodeHeartbeatCache;
+  private final int nodeId;
+  private final HeartbeatSampleCache cache;
 
-  public ConfigNodeHeartbeatHandler(ConfigNodeHeartbeatCache configNodeHeartbeatCache) {
-    this.configNodeHeartbeatCache = configNodeHeartbeatCache;
+  public ConfigNodeHeartbeatHandler(int nodeId, HeartbeatSampleCache cache) {
+    this.nodeId = nodeId;
+    this.cache = cache;
   }
 
   @Override
   public void onComplete(Long timestamp) {
-    configNodeHeartbeatCache.cacheHeartbeatSample(
-        new NodeHeartbeatSample(timestamp, System.currentTimeMillis()));
+    long receiveTime = System.currentTimeMillis();
+    cache.cacheConfigNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(timestamp, receiveTime));
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index fe1af63540..bea4455313 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -35,27 +35,23 @@ import java.util.Map;
 
 public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
 
-  // Update DataNodeHeartbeatCache when success
-  private final TDataNodeLocation dataNodeLocation;
-  private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
-  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
-  private final RouteBalancer routeBalancer;
+  private final int nodeId;
+
+  private final HeartbeatSampleCache heartbeatSampleCache;
+
   private final Map<Integer, Long> deviceNum;
   private final Map<Integer, Long> timeSeriesNum;
   private final Map<Integer, Long> regionDisk;
 
   public DataNodeHeartbeatHandler(
-      TDataNodeLocation dataNodeLocation,
-      DataNodeHeartbeatCache dataNodeHeartbeatCache,
-      Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
-      RouteBalancer routeBalancer,
+      int nodeId,
+      HeartbeatSampleCache heartbeatSampleCache,
       Map<Integer, Long> deviceNum,
       Map<Integer, Long> timeSeriesNum,
       Map<Integer, Long> regionDisk) {
-    this.dataNodeLocation = dataNodeLocation;
-    this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
-    this.regionGroupCacheMap = regionGroupCacheMap;
-    this.routeBalancer = routeBalancer;
+
+    this.nodeId = nodeId;
+    this.heartbeatSampleCache = heartbeatSampleCache;
     this.deviceNum = deviceNum;
     this.timeSeriesNum = timeSeriesNum;
     this.regionDisk = regionDisk;
@@ -66,31 +62,29 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
     long receiveTime = System.currentTimeMillis();
 
     // Update NodeCache
-    dataNodeHeartbeatCache.cacheHeartbeatSample(
-        new NodeHeartbeatSample(heartbeatResp, receiveTime));
+    heartbeatSampleCache.cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime));
 
-    // Update RegionGroupCache And leaderCache
     heartbeatResp
         .getJudgedLeaders()
         .forEach(
             (regionGroupId, isLeader) -> {
-              regionGroupCacheMap
-                  .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
-                  .cacheHeartbeatSample(
-                      dataNodeLocation.getDataNodeId(),
-                      new RegionHeartbeatSample(
-                          heartbeatResp.getHeartbeatTimestamp(),
-                          receiveTime,
-                          // Region will inherit DataNode's status
-                          RegionStatus.parse(heartbeatResp.getStatus())));
+              // Update RegionGroupCache
+              heartbeatSampleCache.cacheRegionHeartbeatSample(regionGroupId, nodeId,
+                new RegionHeartbeatSample(
+                  heartbeatResp.getHeartbeatTimestamp(),
+                  receiveTime,
+                  // Region will inherit DataNode's status
+                  RegionStatus.parse(heartbeatResp.getStatus())));
 
               if (isLeader) {
-                routeBalancer.cacheLeaderSample(
+                // Update leaderCache
+                heartbeatSampleCache.cacheLeaderSample(
                     regionGroupId,
                     new Pair<>(
-                        heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId()));
+                        heartbeatResp.getHeartbeatTimestamp(), nodeId));
               }
             });
+
     if (heartbeatResp.getDeviceNum() != null) {
       deviceNum.putAll(heartbeatResp.getDeviceNum());
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 9e93061cf3..78282c4fa3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -209,10 +209,10 @@ public class ConfigRegionStateMachine
 
       // Start leader scheduling services
       configManager.getProcedureManager().shiftExecutor(true);
+      configManager.getLoadManager().startHeartbeatService();
       configManager.getLoadManager().startLoadStatisticsService();
       configManager.getLoadManager().getRouteBalancer().startRouteBalancingService();
       configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
-      configManager.getNodeManager().startHeartbeatService();
       configManager.getPartitionManager().startRegionCleaner();
 
       // we do cq recovery async for two reasons:
@@ -230,10 +230,10 @@ public class ConfigRegionStateMachine
 
       // Stop leader scheduling services
       configManager.getProcedureManager().shiftExecutor(false);
+      configManager.getLoadManager().stopHeartbeatService();
       configManager.getLoadManager().stopLoadStatisticsService();
       configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService();
       configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
-      configManager.getNodeManager().stopHeartbeatService();
       configManager.getPartitionManager().stopRegionCleaner();
       configManager.getCQManager().stopCQScheduler();
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 96ea55f134..cf038e6dba 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -88,7 +88,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.node.NodeMetrics;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.PipeManager;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 63a78d36c1..f9ac0230fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -41,7 +41,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 97666a005b..3dab05219d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -55,6 +55,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class RetryFailedTasksThread {
 
+  // TODO: Replace this class by cluster events
+
   private static final Logger LOGGER = LoggerFactory.getLogger(RetryFailedTasksThread.class);
 
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index a19c942d25..4b18d37a0a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.manager.load;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -32,23 +33,24 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService;
+import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -76,9 +78,6 @@ public class LoadManager {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
 
-  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
-  private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
-
   private final IManager configManager;
 
   /** Balancers */
@@ -87,12 +86,16 @@ public class LoadManager {
   private final PartitionBalancer partitionBalancer;
   private final RouteBalancer routeBalancer;
 
+  /** Cluster load services */
+  private final HeartbeatService heartbeatService;
+  private final StatisticsService statisticsService;
+
+
   /** Load statistics executor service */
+  private final Object statisticsScheduleMonitor = new Object();
   private Future<?> currentLoadStatisticsFuture;
-
   private final ScheduledExecutorService loadStatisticsExecutor =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
-  private final Object scheduleMonitor = new Object();
 
   private final EventBus eventBus =
       new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));
@@ -104,6 +107,9 @@ public class LoadManager {
     this.partitionBalancer = new PartitionBalancer(configManager);
     this.routeBalancer = new RouteBalancer(configManager);
 
+    this.heartbeatService = new HeartbeatService(configManager);
+    this.statisticsService = new StatisticsService(configManager);
+
     eventBus.register(configManager.getClusterSchemaManager());
     eventBus.register(configManager.getSyncManager());
   }
@@ -118,8 +124,8 @@ public class LoadManager {
    * @throws DatabaseNotExistsException If some specific StorageGroups don't exist
    */
   public CreateRegionGroupsPlan allocateRegionGroups(
-      Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
-      throws NotEnoughDataNodeException, DatabaseNotExistsException {
+    Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
+    throws NotEnoughDataNodeException, DatabaseNotExistsException {
     return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType);
   }
 
@@ -130,8 +136,8 @@ public class LoadManager {
    * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
-      throws NoAvailableRegionGroupException {
+    Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+    throws NoAvailableRegionGroupException {
     return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
   }
 
@@ -142,8 +148,8 @@ public class LoadManager {
    * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
-      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
-      throws NoAvailableRegionGroupException {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
+    throws NoAvailableRegionGroupException {
     return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
   }
 
@@ -185,7 +191,7 @@ public class LoadManager {
 
   /** Start the load statistics service */
   public void startLoadStatisticsService() {
-    synchronized (scheduleMonitor) {
+    synchronized (heartbeatScheduleMonitor) {
       if (currentLoadStatisticsFuture == null) {
         currentLoadStatisticsFuture =
             ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
@@ -201,7 +207,7 @@ public class LoadManager {
 
   /** Stop the load statistics service */
   public void stopLoadStatisticsService() {
-    synchronized (scheduleMonitor) {
+    synchronized (heartbeatScheduleMonitor) {
       if (currentLoadStatisticsFuture != null) {
         currentLoadStatisticsFuture.cancel(false);
         currentLoadStatisticsFuture = null;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 31f28397fb..025d29e6fe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -93,12 +93,6 @@ public class RouteBalancer {
 
   private final IManager configManager;
 
-  // Key: RegionGroupId
-  // Value: Pair<Timestamp, LeaderDataNodeId>, where
-  // the left value stands for sampling timestamp
-  // and the right value stands for the index of DataNode that leader resides.
-  private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
-
   /** RegionRouteMap */
   private final RegionRouteMap regionRouteMap;
   // For generating optimal RegionLeaderMap
@@ -107,6 +101,7 @@ public class RouteBalancer {
   private final IPriorityBalancer priorityRouter;
 
   /** Leader Balancing service */
+  // TODO: leader balancing should be triggered by cluster events
   private Future<?> currentLeaderBalancingFuture;
 
   private final ScheduledExecutorService leaderBalancingExecutor =
@@ -115,8 +110,6 @@ public class RouteBalancer {
 
   public RouteBalancer(IManager configManager) {
     this.configManager = configManager;
-
-    this.leaderCache = new ConcurrentHashMap<>();
     this.regionRouteMap = new RegionRouteMap();
 
     switch (CONF.getLeaderDistributionPolicy()) {
@@ -140,27 +133,6 @@ public class RouteBalancer {
     }
   }
 
-  /**
-   * Cache the newest leaderHeartbeatSample
-   *
-   * @param regionGroupId Corresponding RegionGroup's index
-   * @param leaderSample <Sample timestamp, leaderDataNodeId>, The newest HeartbeatSample
-   */
-  public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
-    if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
-        && IS_DATA_REGION_IOT_CONSENSUS) {
-      // The leadership of IoTConsensus protocol is decided by ConfigNode-leader
-      return;
-    }
-
-    leaderCache.putIfAbsent(regionGroupId, leaderSample);
-    synchronized (leaderCache.get(regionGroupId)) {
-      if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) {
-        leaderCache.replace(regionGroupId, leaderSample);
-      }
-    }
-  }
-
   /**
    * Invoking periodically to update the RegionRouteMap
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java
new file mode 100644
index 0000000000..4fcb27d00d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Maintain all kinds of heartbeat samples */
+public class HeartbeatSampleCache {
+
+  private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
+    ConsensusFactory.IOT_CONSENSUS.equals(ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+
+  // Map<NodeId, INodeCache>
+  private final Map<Integer, BaseNodeCache> nodeCacheMap;
+  // Map<RegionGroupId, RegionGroupCache>
+  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
+  // Key: RegionGroupId
+  // Value: Pair<Timestamp, LeaderDataNodeId>, where
+  // the left value stands for sampling timestamp
+  // and the right value stands for the index of DataNode that leader resides.
+  private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
+
+  public HeartbeatSampleCache() {
+    this.nodeCacheMap = new ConcurrentHashMap<>();
+    this.regionGroupCacheMap = new ConcurrentHashMap<>();
+    this.leaderCache = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a ConfigNode.
+   *
+   * @param nodeId the id of the ConfigNode
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+    nodeCacheMap.computeIfAbsent(nodeId,
+      empty -> new ConfigNodeHeartbeatCache(nodeId))
+      .cacheHeartbeatSample(sample);
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a DataNode.
+   *
+   * @param nodeId the id of the DataNode
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+    nodeCacheMap.computeIfAbsent(nodeId,
+      empty -> new DataNodeHeartbeatCache())
+      .cacheHeartbeatSample(sample);
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a RegionGroup.
+   *
+   * @param regionGroupId the id of the RegionGroup
+   * @param nodeId the id of the DataNode where specified Region resides
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheRegionHeartbeatSample(TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) {
+    regionGroupCacheMap.computeIfAbsent(regionGroupId,
+      empty -> new RegionGroupCache(regionGroupId))
+      .cacheHeartbeatSample(nodeId, sample);
+  }
+
+  /**
+   * Cache the latest leader of a RegionGroup.
+   *
+   * @param regionGroupId the id of the RegionGroup
+   * @param leaderSample the latest leader of a RegionGroup
+   */
+  public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
+    if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
+      && IS_DATA_REGION_IOT_CONSENSUS) {
+      // The leadership of IoTConsensus protocol is decided by ConfigNode-leader
+      return;
+    }
+
+    leaderCache.putIfAbsent(regionGroupId, leaderSample);
+    synchronized (leaderCache.get(regionGroupId)) {
+      if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) {
+        leaderCache.replace(regionGroupId, leaderSample);
+      }
+    }
+  }
+
+  public void clear() {
+    nodeCacheMap.clear();
+    regionGroupCacheMap.clear();
+    leaderCache.clear();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
new file mode 100644
index 0000000000..056ecfd76c
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Maintain the Cluster-Heartbeat-Service */
+public class HeartbeatService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
+
+  private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+
+  private final IManager configManager;
+
+  /** Heartbeat executor service */
+  // Monitor for leadership change
+  private final Object heartbeatScheduleMonitor = new Object();
+  private Future<?> currentHeartbeatFuture;
+  private final ScheduledExecutorService heartBeatExecutor =
+    IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
+  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+
+  private final HeartbeatSampleCache heartbeatSampleCache;
+
+  public HeartbeatService(IManager configManager) {
+    this.configManager = configManager;
+    this.heartbeatSampleCache = new HeartbeatSampleCache();
+  }
+
+  /** Start the heartbeat service */
+  public void startHeartbeatService() {
+    synchronized (heartbeatScheduleMonitor) {
+      if (currentHeartbeatFuture == null) {
+        currentHeartbeatFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+            heartBeatExecutor,
+            this::heartbeatLoopBody,
+            0,
+            HEARTBEAT_INTERVAL,
+            TimeUnit.MILLISECONDS);
+        LOGGER.info("Heartbeat service is started successfully.");
+      }
+    }
+  }
+
+  /** Stop the heartbeat service */
+  public void stopHeartbeatService() {
+    synchronized (heartbeatScheduleMonitor) {
+      if (currentHeartbeatFuture != null) {
+        currentHeartbeatFuture.cancel(false);
+        currentHeartbeatFuture = null;
+        heartbeatSampleCache.clear();
+        LOGGER.info("Heartbeat service is stopped successfully.");
+      }
+    }
+  }
+
+  private THeartbeatReq genHeartbeatReq() {
+    /* Generate heartbeat request */
+    THeartbeatReq heartbeatReq = new THeartbeatReq();
+    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+    // Always sample RegionGroups' leadership as the Region heartbeat
+    heartbeatReq.setNeedJudgeLeader(true);
+    // We sample DataNode's load in every 10 heartbeat loop
+    heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+
+    /* Update heartbeat counter */
+    heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+    if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
+      heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
+      heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
+      heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
+    }
+    return heartbeatReq;
+  }
+
+  /** loop body of the heartbeat thread */
+  private void heartbeatLoopBody() {
+    // The consensusManager of configManager may not be fully initialized at this time
+    Optional.ofNullable(getConsensusManager())
+      .ifPresent(
+        consensusManager -> {
+          if (getConsensusManager().isLeader()) {
+            // Generate HeartbeatReq
+            THeartbeatReq heartbeatReq = genHeartbeatReq();
+            // Send heartbeat requests to all the registered ConfigNodes
+            pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes());
+            // Send heartbeat requests to all the registered DataNodes
+            pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
+          }
+        });
+  }
+
+  /**
+   * Send heartbeat requests to all the Registered ConfigNodes
+   *
+   * @param registeredConfigNodes ConfigNodes that registered in cluster
+   */
+  private void pingRegisteredConfigNodes(
+    THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
+    // Send heartbeat requests
+    for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+      if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
+        // Skip itself
+        continue;
+      }
+
+      ConfigNodeHeartbeatHandler handler =
+        new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), heartbeatSampleCache);
+      AsyncConfigNodeHeartbeatClientPool.getInstance()
+        .getConfigNodeHeartBeat(
+          configNodeLocation.getInternalEndPoint(),
+          heartbeatReq.getHeartbeatTimestamp(),
+          handler);
+    }
+  }
+
+  /**
+   * Send heartbeat requests to all the Registered DataNodes
+   *
+   * @param registeredDataNodes DataNodes that registered in cluster
+   */
+  private void pingRegisteredDataNodes(
+    THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
+    // Send heartbeat requests
+    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
+      DataNodeHeartbeatHandler handler =
+        new DataNodeHeartbeatHandler(
+          dataNodeInfo.getLocation().getDataNodeId(),
+          heartbeatSampleCache,
+          configManager.getClusterQuotaManager().getDeviceNum(),
+          configManager.getClusterQuotaManager().getTimeSeriesNum(),
+          configManager.getClusterQuotaManager().getRegionDisk());
+      configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
+      AsyncDataNodeHeartbeatClientPool.getInstance()
+        .getDataNodeHeartBeat(
+          dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+    }
+  }
+
+  private ConsensusManager getConsensusManager() {
+    return configManager.getConsensusManager();
+  }
+
+  private NodeManager getNodeManager() {
+    return configManager.getNodeManager();
+  }
+
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
index 24f9a9edb5..392ae24b6e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 
 import java.util.LinkedList;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
index 17fc9b1eb2..afd9d3e197 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 
 public class ConfigNodeHeartbeatCache extends BaseNodeCache {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
index 5754d27320..cb20836120 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
 
 /** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java
index dceff727bc..b3857c11f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java
index 706b40e3e8..6aff8394e9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
 
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
+import static org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
 
 public class RegionCache {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
similarity index 96%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
index 7e490670a9..fb5cf7766f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.region;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java
similarity index 95%
copy from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java
index 8de58a5e93..88e94e4be2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java
similarity index 96%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java
index 627ac00a33..a0a8645209 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.statistics;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
index d36175bc67..d22ea0e4d6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.statistics;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java
index d30bf43d96..87bdf39f91 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.statistics;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
similarity index 53%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
index 8de58a5e93..79b8ad1869 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
@@ -16,34 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
 
-import org.apache.iotdb.commons.cluster.RegionStatus;
+package org.apache.iotdb.confignode.manager.load.statistics;
 
-public class RegionHeartbeatSample {
+public class StatisticsService {
 
-  // Unit: ms
-  private final long sendTimestamp;
-  private final long receiveTimestamp;
-  private final RegionStatus status;
-
-  // TODO: Add load sample
-
-  public RegionHeartbeatSample(long sendTimestamp, long receiveTimestamp, RegionStatus status) {
-    this.sendTimestamp = sendTimestamp;
-    this.receiveTimestamp = receiveTimestamp;
-    this.status = status;
-  }
-
-  public long getSendTimestamp() {
-    return sendTimestamp;
-  }
-
-  public long getReceiveTimestamp() {
-    return receiveTimestamp;
-  }
-
-  public RegionStatus getStatus() {
-    return status;
-  }
+  
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 97125db05a..62ce597f2f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -27,18 +27,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -60,9 +54,9 @@ import org.apache.iotdb.confignode.manager.TriggerManager;
 import org.apache.iotdb.confignode.manager.UDFManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -81,7 +75,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -98,9 +91,6 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
@@ -119,23 +109,12 @@ public class NodeManager {
 
   private final ReentrantLock removeConfigNodeLock;
 
-  /** Heartbeat executor service */
-  // Monitor for leadership change
-  private final Object scheduleMonitor = new Object();
-  // Map<NodeId, INodeCache>
-  private final Map<Integer, BaseNodeCache> nodeCacheMap;
-  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
-  private Future<?> currentHeartbeatFuture;
-  private final ScheduledExecutorService heartBeatExecutor =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
-
   private final Random random;
 
   public NodeManager(IManager configManager, NodeInfo nodeInfo) {
     this.configManager = configManager;
     this.nodeInfo = nodeInfo;
     this.removeConfigNodeLock = new ReentrantLock();
-    this.nodeCacheMap = new ConcurrentHashMap<>();
     this.random = new Random(System.currentTimeMillis());
   }
 
@@ -687,125 +666,6 @@ public class NodeManager {
     }
   }
 
-  /** Start the heartbeat service */
-  public void startHeartbeatService() {
-    synchronized (scheduleMonitor) {
-      if (currentHeartbeatFuture == null) {
-        currentHeartbeatFuture =
-            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                heartBeatExecutor,
-                this::heartbeatLoopBody,
-                0,
-                HEARTBEAT_INTERVAL,
-                TimeUnit.MILLISECONDS);
-        LOGGER.info("Heartbeat service is started successfully.");
-      }
-    }
-  }
-
-  /** loop body of the heartbeat thread */
-  private void heartbeatLoopBody() {
-    // The consensusManager of configManager may not be fully initialized at this time
-    Optional.ofNullable(getConsensusManager())
-        .ifPresent(
-            consensusManager -> {
-              if (getConsensusManager().isLeader()) {
-                // Generate HeartbeatReq
-                THeartbeatReq heartbeatReq = genHeartbeatReq();
-                // Send heartbeat requests to all the registered DataNodes
-                pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
-                // Send heartbeat requests to all the registered ConfigNodes
-                pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
-              }
-            });
-  }
-
-  private THeartbeatReq genHeartbeatReq() {
-    /* Generate heartbeat request */
-    THeartbeatReq heartbeatReq = new THeartbeatReq();
-    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
-    // Always sample RegionGroups' leadership as the Region heartbeat
-    heartbeatReq.setNeedJudgeLeader(true);
-    // We sample DataNode's load in every 10 heartbeat loop
-    heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
-
-    /* Update heartbeat counter */
-    heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
-    if (!getClusterQuotaManager().hasSpaceQuotaLimit()) {
-      heartbeatReq.setSchemaRegionIds(getClusterQuotaManager().getSchemaRegionIds());
-      heartbeatReq.setDataRegionIds(getClusterQuotaManager().getDataRegionIds());
-      heartbeatReq.setSpaceQuotaUsage(getClusterQuotaManager().getSpaceQuotaUsage());
-    }
-    return heartbeatReq;
-  }
-
-  /**
-   * Send heartbeat requests to all the Registered DataNodes
-   *
-   * @param registeredDataNodes DataNodes that registered in cluster
-   */
-  private void pingRegisteredDataNodes(
-      THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
-    // Send heartbeat requests
-    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
-      DataNodeHeartbeatHandler handler =
-          new DataNodeHeartbeatHandler(
-              dataNodeInfo.getLocation(),
-              (DataNodeHeartbeatCache)
-                  nodeCacheMap.computeIfAbsent(
-                      dataNodeInfo.getLocation().getDataNodeId(),
-                      empty -> new DataNodeHeartbeatCache()),
-              getPartitionManager().getRegionGroupCacheMap(),
-              getLoadManager().getRouteBalancer(),
-              getClusterQuotaManager().getDeviceNum(),
-              getClusterQuotaManager().getTimeSeriesNum(),
-              getClusterQuotaManager().getRegionDisk());
-      getClusterQuotaManager().updateSpaceQuotaUsage();
-      AsyncDataNodeHeartbeatClientPool.getInstance()
-          .getDataNodeHeartBeat(
-              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
-    }
-  }
-
-  /**
-   * Send heartbeat requests to all the Registered ConfigNodes
-   *
-   * @param registeredConfigNodes ConfigNodes that registered in cluster
-   */
-  private void pingRegisteredConfigNodes(
-      THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
-    // Send heartbeat requests
-    for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
-      if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
-        // Skip itself
-        continue;
-      }
-
-      ConfigNodeHeartbeatHandler handler =
-          new ConfigNodeHeartbeatHandler(
-              (ConfigNodeHeartbeatCache)
-                  nodeCacheMap.computeIfAbsent(
-                      configNodeLocation.getConfigNodeId(),
-                      empty -> new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())));
-      AsyncConfigNodeHeartbeatClientPool.getInstance()
-          .getConfigNodeHeartBeat(
-              configNodeLocation.getInternalEndPoint(),
-              heartbeatReq.getHeartbeatTimestamp(),
-              handler);
-    }
-  }
-
-  /** Stop the heartbeat service */
-  public void stopHeartbeatService() {
-    synchronized (scheduleMonitor) {
-      if (currentHeartbeatFuture != null) {
-        currentHeartbeatFuture.cancel(false);
-        currentHeartbeatFuture = null;
-        nodeCacheMap.clear();
-        LOGGER.info("Heartbeat service is stopped successfully.");
-      }
-    }
-  }
 
   public Map<Integer, BaseNodeCache> getNodeCacheMap() {
     return nodeCacheMap;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
index a38adafd74..4c44fb07c1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.confignode.manager.observer;
 
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 223bee15e4..a754ccf396 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -68,7 +68,7 @@ import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.ProcedureManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -128,15 +128,11 @@ public class PartitionManager {
   private final ScheduledExecutorService regionMaintainer;
   private Future<?> currentRegionMaintainerFuture;
 
-  // Map<RegionId, RegionGroupCache>
-  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
-
   public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
     this.configManager = configManager;
     this.partitionInfo = partitionInfo;
     this.regionMaintainer =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Maintainer");
-    this.regionGroupCacheMap = new ConcurrentHashMap<>();
     setSeriesPartitionExecutor();
   }
 
@@ -1070,10 +1066,6 @@ public class PartitionManager {
     }
   }
 
-  public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
-    return regionGroupCacheMap;
-  }
-
   public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
     regionGroupCacheMap.remove(consensusGroupId);
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index efa53d330d..5faefbcd4f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -49,10 +49,10 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index afa2d3c543..bf1ce7e940 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index cb617bf027..6da0304f44 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index a028219b08..f1bf444427 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
index 4d93b6355b..b51cf10bcc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
@@ -19,8 +19,8 @@
 package org.apache.iotdb.confignode.manager.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
index 3c3cf4881b..da91dc82dc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.confignode.manager.partition;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
index 4181fa9d67..766a5ed48f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.confignode.persistence.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
index cce3ed250a..b0b97b2b0c 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.confignode.persistence.partition.statistics;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
index 8ccf87c7c2..4ad69092ef 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.confignode.persistence.partition.statistics;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;


[iotdb] 02/03: temporary relationship

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a9841c8d1a02c729e8e9b6c125f588b4d84138ca
Author: YongzaoDan <53...@qq.com>
AuthorDate: Wed Apr 12 22:53:17 2023 +0800

    temporary relationship
---
 .../heartbeat/ConfigNodeHeartbeatHandler.java      |   7 +-
 .../heartbeat/DataNodeHeartbeatHandler.java        |  39 ++--
 .../statemachine/ConfigRegionStateMachine.java     |   9 +-
 .../iotdb/confignode/manager/ConfigManager.java    |   2 +-
 .../iotdb/confignode/manager/ProcedureManager.java |   2 +-
 .../confignode/manager/RetryFailedTasksThread.java |   2 +-
 .../iotdb/confignode/manager/load/LoadCache.java   | 186 +++++++++++++++++
 .../iotdb/confignode/manager/load/LoadManager.java | 229 ++-------------------
 .../manager/load/balancer/RouteBalancer.java       |  28 +++
 .../load/heartbeat/HeartbeatSampleCache.java       | 123 -----------
 .../manager/load/heartbeat/HeartbeatService.java   |  95 ++++-----
 .../load/heartbeat/region/RegionGroupCache.java    |   2 +-
 .../load/statistics/RegionGroupStatistics.java     |   9 +-
 .../manager/load/statistics/StatisticsService.java | 191 ++++++++++++++++-
 .../iotdb/confignode/manager/node/NodeManager.java |  31 ---
 .../manager/partition/PartitionManager.java        |  12 --
 .../procedure/env/ConfigNodeProcedureEnv.java      |   4 +-
 .../statistics/RegionGroupStatisticsTest.java      |   2 +-
 18 files changed, 513 insertions(+), 460 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index 92881210e7..86435e7755 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -18,8 +18,7 @@
  */
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.LoadCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -27,9 +26,9 @@ import org.apache.thrift.async.AsyncMethodCallback;
 public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
 
   private final int nodeId;
-  private final HeartbeatSampleCache cache;
+  private final LoadCache cache;
 
-  public ConfigNodeHeartbeatHandler(int nodeId, HeartbeatSampleCache cache) {
+  public ConfigNodeHeartbeatHandler(int nodeId, LoadCache cache) {
     this.nodeId = nodeId;
     this.cache = cache;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index bea4455313..83b0bc1c23 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -18,13 +18,10 @@
  */
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.LoadCache;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -37,7 +34,8 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
 
   private final int nodeId;
 
-  private final HeartbeatSampleCache heartbeatSampleCache;
+  private final LoadCache loadCache;
+  private final RouteBalancer routeBalancer;
 
   private final Map<Integer, Long> deviceNum;
   private final Map<Integer, Long> timeSeriesNum;
@@ -45,13 +43,15 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
 
   public DataNodeHeartbeatHandler(
       int nodeId,
-      HeartbeatSampleCache heartbeatSampleCache,
+      LoadCache loadCache,
+      RouteBalancer routeBalancer,
       Map<Integer, Long> deviceNum,
       Map<Integer, Long> timeSeriesNum,
       Map<Integer, Long> regionDisk) {
 
     this.nodeId = nodeId;
-    this.heartbeatSampleCache = heartbeatSampleCache;
+    this.loadCache = loadCache;
+    this.routeBalancer = routeBalancer;
     this.deviceNum = deviceNum;
     this.timeSeriesNum = timeSeriesNum;
     this.regionDisk = regionDisk;
@@ -62,26 +62,27 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
     long receiveTime = System.currentTimeMillis();
 
     // Update NodeCache
-    heartbeatSampleCache.cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime));
+    loadCache.cacheDataNodeHeartbeatSample(
+        nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime));
 
     heartbeatResp
         .getJudgedLeaders()
         .forEach(
             (regionGroupId, isLeader) -> {
               // Update RegionGroupCache
-              heartbeatSampleCache.cacheRegionHeartbeatSample(regionGroupId, nodeId,
-                new RegionHeartbeatSample(
-                  heartbeatResp.getHeartbeatTimestamp(),
-                  receiveTime,
-                  // Region will inherit DataNode's status
-                  RegionStatus.parse(heartbeatResp.getStatus())));
+              loadCache.cacheRegionHeartbeatSample(
+                  regionGroupId,
+                  nodeId,
+                  new RegionHeartbeatSample(
+                      heartbeatResp.getHeartbeatTimestamp(),
+                      receiveTime,
+                      // Region will inherit DataNode's status
+                      RegionStatus.parse(heartbeatResp.getStatus())));
 
               if (isLeader) {
                 // Update leaderCache
-                heartbeatSampleCache.cacheLeaderSample(
-                    regionGroupId,
-                    new Pair<>(
-                        heartbeatResp.getHeartbeatTimestamp(), nodeId));
+                routeBalancer.cacheLeaderSample(
+                    regionGroupId, new Pair<>(heartbeatResp.getHeartbeatTimestamp(), nodeId));
               }
             });
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 78282c4fa3..f6b3f25e14 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -204,13 +204,11 @@ public class ConfigRegionStateMachine
           newLeaderId,
           currentNodeTEndPoint);
 
-      // Always initiate all kinds of HeartbeatCache first
-      configManager.getLoadManager().initHeartbeatCache();
+      // Always start load services first
+      configManager.getLoadManager().startLoadServices();
 
       // Start leader scheduling services
       configManager.getProcedureManager().shiftExecutor(true);
-      configManager.getLoadManager().startHeartbeatService();
-      configManager.getLoadManager().startLoadStatisticsService();
       configManager.getLoadManager().getRouteBalancer().startRouteBalancingService();
       configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
       configManager.getPartitionManager().startRegionCleaner();
@@ -229,9 +227,8 @@ public class ConfigRegionStateMachine
           newLeaderId);
 
       // Stop leader scheduling services
+      configManager.getLoadManager().stopLoadServices();
       configManager.getProcedureManager().shiftExecutor(false);
-      configManager.getLoadManager().stopHeartbeatService();
-      configManager.getLoadManager().stopLoadStatisticsService();
       configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService();
       configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
       configManager.getPartitionManager().stopRegionCleaner();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index cf038e6dba..dab5101d1c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -85,10 +85,10 @@ import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachi
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.node.NodeMetrics;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.PipeManager;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index f9ac0230fc..5df812d4fb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -40,8 +40,8 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
 import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 3dab05219d..865b8b59c2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
new file mode 100644
index 0000000000..03f002899c
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Maintain all kinds of heartbeat samples */
+public class LoadCache {
+
+  private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
+      ConsensusFactory.IOT_CONSENSUS.equals(
+          ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+
+  // Map<NodeId, INodeCache>
+  private final Map<Integer, BaseNodeCache> nodeCacheMap;
+  // Map<RegionGroupId, RegionGroupCache>
+  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
+
+  public LoadCache() {
+    this.nodeCacheMap = new ConcurrentHashMap<>();
+    this.regionGroupCacheMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a ConfigNode.
+   *
+   * @param nodeId the id of the ConfigNode
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+    nodeCacheMap
+        .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
+        .cacheHeartbeatSample(sample);
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a DataNode.
+   *
+   * @param nodeId the id of the DataNode
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+    nodeCacheMap
+        .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache())
+        .cacheHeartbeatSample(sample);
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a RegionGroup.
+   *
+   * @param regionGroupId the id of the RegionGroup
+   * @param nodeId the id of the DataNode where specified Region resides
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheRegionHeartbeatSample(
+      TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) {
+    regionGroupCacheMap
+        .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+        .cacheHeartbeatSample(nodeId, sample);
+  }
+
+  /**
+   * Periodic invoke to update the NodeStatistics of all Nodes.
+   *
+   * @return a map of changed NodeStatistics
+   */
+  public Map<Integer, Pair<NodeStatistics, NodeStatistics>> updateNodeStatistics() {
+    Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
+        new ConcurrentHashMap<>();
+    nodeCacheMap.forEach(
+        (nodeId, nodeCache) -> {
+          NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy();
+          if (nodeCache.periodicUpdate()) {
+            // Update and record the changed NodeStatistics
+            differentNodeStatisticsMap.put(
+                nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
+          }
+        });
+    return differentNodeStatisticsMap;
+  }
+
+  /**
+   * Periodic invoke to update the RegionGroupStatistics of all RegionGroups.
+   *
+   * @return a map of changed RegionGroupStatistics
+   */
+  public Map<TConsensusGroupId, RegionGroupStatistics> updateRegionGroupStatistics() {
+    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
+        new ConcurrentHashMap<>();
+    regionGroupCacheMap.forEach(
+        (regionGroupId, regionGroupCache) -> {
+          if (regionGroupCache.periodicUpdate()) {
+            // Update and record the changed RegionGroupStatistics
+            differentRegionGroupStatisticsMap.put(regionGroupId, regionGroupCache.getStatistics());
+          }
+        });
+    return differentRegionGroupStatisticsMap;
+  }
+
+  public void initHeartbeatCache(IManager configManager) {
+    initNodeHeartbeatCache(
+        configManager.getNodeManager().getRegisteredConfigNodes(),
+        configManager.getNodeManager().getRegisteredDataNodes());
+    initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+  }
+
+  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
+  private void initNodeHeartbeatCache(
+      List<TConfigNodeLocation> registeredConfigNodes,
+      List<TDataNodeConfiguration> registeredDataNodes) {
+    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
+    nodeCacheMap.clear();
+
+    // Init ConfigNodeHeartbeatCache
+    registeredConfigNodes.forEach(
+        configNodeLocation -> {
+          if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) {
+            nodeCacheMap.put(
+                configNodeLocation.getConfigNodeId(),
+                new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()));
+          }
+        });
+    // Force set itself and never update
+    nodeCacheMap.put(
+        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
+        new ConfigNodeHeartbeatCache(
+            CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
+
+    // Init DataNodeHeartbeatCache
+    registeredDataNodes.forEach(
+        dataNodeConfiguration ->
+            nodeCacheMap.put(
+                dataNodeConfiguration.getLocation().getDataNodeId(), new DataNodeHeartbeatCache()));
+  }
+
+  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
+  private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) {
+    regionGroupCacheMap.clear();
+    regionReplicaSets.forEach(
+        regionReplicaSet ->
+            regionGroupCacheMap.put(
+                regionReplicaSet.getRegionId(),
+                new RegionGroupCache(regionReplicaSet.getRegionId())));
+  }
+
+  public void clearHeartbeatCache() {
+    nodeCacheMap.clear();
+    regionGroupCacheMap.clear();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 4b18d37a0a..15dc22c0f5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -21,39 +21,21 @@ package org.apache.iotdb.confignode.manager.load;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService;
 import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService;
-import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
@@ -62,13 +44,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 /**
  * The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster
@@ -87,16 +64,11 @@ public class LoadManager {
   private final RouteBalancer routeBalancer;
 
   /** Cluster load services */
+  private final LoadCache loadCache;
+
   private final HeartbeatService heartbeatService;
   private final StatisticsService statisticsService;
 
-
-  /** Load statistics executor service */
-  private final Object statisticsScheduleMonitor = new Object();
-  private Future<?> currentLoadStatisticsFuture;
-  private final ScheduledExecutorService loadStatisticsExecutor =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
-
   private final EventBus eventBus =
       new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));
 
@@ -107,8 +79,9 @@ public class LoadManager {
     this.partitionBalancer = new PartitionBalancer(configManager);
     this.routeBalancer = new RouteBalancer(configManager);
 
-    this.heartbeatService = new HeartbeatService(configManager);
-    this.statisticsService = new StatisticsService(configManager);
+    this.loadCache = new LoadCache();
+    this.heartbeatService = new HeartbeatService(configManager, loadCache);
+    this.statisticsService = new StatisticsService(configManager, loadCache, eventBus);
 
     eventBus.register(configManager.getClusterSchemaManager());
     eventBus.register(configManager.getSyncManager());
@@ -124,8 +97,8 @@ public class LoadManager {
    * @throws DatabaseNotExistsException If some specific StorageGroups don't exist
    */
   public CreateRegionGroupsPlan allocateRegionGroups(
-    Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
-    throws NotEnoughDataNodeException, DatabaseNotExistsException {
+      Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
+      throws NotEnoughDataNodeException, DatabaseNotExistsException {
     return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType);
   }
 
@@ -136,8 +109,8 @@ public class LoadManager {
    * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
-    Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
-    throws NoAvailableRegionGroupException {
+      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+      throws NoAvailableRegionGroupException {
     return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
   }
 
@@ -148,8 +121,8 @@ public class LoadManager {
    * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
-    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
-    throws NoAvailableRegionGroupException {
+      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
+      throws NoAvailableRegionGroupException {
     return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
   }
 
@@ -189,182 +162,20 @@ public class LoadManager {
     return routeBalancer.getLatestRegionPriorityMap();
   }
 
-  /** Start the load statistics service */
-  public void startLoadStatisticsService() {
-    synchronized (heartbeatScheduleMonitor) {
-      if (currentLoadStatisticsFuture == null) {
-        currentLoadStatisticsFuture =
-            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                loadStatisticsExecutor,
-                this::updateLoadStatistics,
-                0,
-                HEARTBEAT_INTERVAL,
-                TimeUnit.MILLISECONDS);
-        LOGGER.info("LoadStatistics service is started successfully.");
-      }
-    }
-  }
-
-  /** Stop the load statistics service */
-  public void stopLoadStatisticsService() {
-    synchronized (heartbeatScheduleMonitor) {
-      if (currentLoadStatisticsFuture != null) {
-        currentLoadStatisticsFuture.cancel(false);
-        currentLoadStatisticsFuture = null;
-        LOGGER.info("LoadStatistics service is stopped successfully.");
-      }
-    }
-  }
-
-  private void updateLoadStatistics() {
-    // Broadcast the RegionRouteMap if some LoadStatistics has changed
-    boolean isNeedBroadcast = false;
-
-    // Update NodeStatistics:
-    // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
-    // means the previous NodeStatistics
-    Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
-        new ConcurrentHashMap<>();
-    getNodeManager()
-        .getNodeCacheMap()
-        .forEach(
-            (nodeId, nodeCache) -> {
-              NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy();
-              if (nodeCache.periodicUpdate()) {
-                // Update and record the changed NodeStatistics
-                differentNodeStatisticsMap.put(
-                    nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
-              }
-            });
-    if (!differentNodeStatisticsMap.isEmpty()) {
-      isNeedBroadcast = true;
-      recordNodeStatistics(differentNodeStatisticsMap);
-      eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
-    }
-
-    // Update RegionGroupStatistics
-    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
-        new ConcurrentHashMap<>();
-    getPartitionManager()
-        .getRegionGroupCacheMap()
-        .forEach(
-            (regionGroupId, regionGroupCache) -> {
-              if (regionGroupCache.periodicUpdate()) {
-                // Update and record the changed RegionGroupStatistics
-                differentRegionGroupStatisticsMap.put(
-                    regionGroupId, regionGroupCache.getStatistics());
-              }
-            });
-    if (!differentRegionGroupStatisticsMap.isEmpty()) {
-      isNeedBroadcast = true;
-      recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
-    }
-
-    // Update RegionRouteMap
-    if (routeBalancer.updateRegionRouteMap()) {
-      isNeedBroadcast = true;
-      recordRegionRouteMap(routeBalancer.getRegionRouteMap());
-    }
-
-    if (isNeedBroadcast) {
-      broadcastLatestRegionRouteMap();
-    }
-  }
-
-  private void recordNodeStatistics(
-      Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
-    LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
-    for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry :
-        differentNodeStatisticsMap.entrySet()) {
-      LOGGER.info(
-          "[UpdateLoadStatistics]\t {}={}",
-          "nodeId{" + nodeCacheEntry.getKey() + "}",
-          nodeCacheEntry.getValue().left);
-    }
-  }
-
-  private void recordRegionGroupStatistics(
-      Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
-    LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
-    for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
-        differentRegionGroupStatisticsMap.entrySet()) {
-      LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
-      LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
-      for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
-          regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
-        LOGGER.info(
-            "[UpdateLoadStatistics]\t dataNodeId{}={}",
-            regionStatisticsEntry.getKey(),
-            regionStatisticsEntry.getValue());
-      }
-    }
-  }
-
-  private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
-    LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
-    for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
-        regionRouteMap.getRegionLeaderMap().entrySet()) {
-      LOGGER.info(
-          "[UpdateLoadStatistics]\t {}={}",
-          regionLeaderEntry.getKey(),
-          regionLeaderEntry.getValue());
-    }
-
-    LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
-    for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
-        regionRouteMap.getRegionPriorityMap().entrySet()) {
-      LOGGER.info(
-          "[UpdateLoadStatistics]\t {}={}",
-          regionPriorityEntry.getKey(),
-          regionPriorityEntry.getValue().getDataNodeLocations().stream()
-              .map(TDataNodeLocation::getDataNodeId)
-              .collect(Collectors.toList()));
-    }
-  }
-
-  public void broadcastLatestRegionRouteMap() {
-    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = getLatestRegionRouteMap();
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
-    getNodeManager()
-        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
-        .forEach(
-            onlineDataNode ->
-                dataNodeLocationMap.put(
-                    onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
-
-    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
-    long broadcastTime = System.currentTimeMillis();
-
-    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(
-            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
-            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
-            dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+  public void startLoadServices() {
+    loadCache.initHeartbeatCache(configManager);
+    routeBalancer.initRegionRouteMap();
+    heartbeatService.startHeartbeatService();
+    statisticsService.startLoadStatisticsService();
   }
 
-  /** Initialize all kinds of the HeartbeatCache when the ConfigNode-Leader is switched */
-  public void initHeartbeatCache() {
-    getNodeManager().initNodeHeartbeatCache();
-    getPartitionManager().initRegionGroupHeartbeatCache();
-    routeBalancer.initRegionRouteMap();
+  public void stopLoadServices() {
+    heartbeatService.stopHeartbeatService();
+    statisticsService.stopLoadStatisticsService();
+    loadCache.clearHeartbeatCache();
   }
 
   public RouteBalancer getRouteBalancer() {
     return routeBalancer;
   }
-
-  private NodeManager getNodeManager() {
-    return configManager.getNodeManager();
-  }
-
-  private PartitionManager getPartitionManager() {
-    return configManager.getPartitionManager();
-  }
-
-  public EventBus getEventBus() {
-    return eventBus;
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 025d29e6fe..4a27368459 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -91,6 +91,12 @@ public class RouteBalancer {
   private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
       ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
 
+  // Key: RegionGroupId
+  // Value: Pair<Timestamp, LeaderDataNodeId>, where
+  // the left value stands for sampling timestamp
+  // and the right value stands for the index of DataNode that leader resides.
+  private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
+
   private final IManager configManager;
 
   /** RegionRouteMap */
@@ -111,6 +117,7 @@ public class RouteBalancer {
   public RouteBalancer(IManager configManager) {
     this.configManager = configManager;
     this.regionRouteMap = new RegionRouteMap();
+    this.leaderCache = new ConcurrentHashMap<>();
 
     switch (CONF.getLeaderDistributionPolicy()) {
       case ILeaderBalancer.GREEDY_POLICY:
@@ -133,6 +140,27 @@ public class RouteBalancer {
     }
   }
 
+  /**
+   * Cache the latest leader of a RegionGroup.
+   *
+   * @param regionGroupId the id of the RegionGroup
+   * @param leaderSample the latest leader of a RegionGroup
+   */
+  public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
+    if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
+        && IS_DATA_REGION_IOT_CONSENSUS) {
+      // The leadership of IoTConsensus protocol is decided by ConfigNode-leader
+      return;
+    }
+
+    leaderCache.putIfAbsent(regionGroupId, leaderSample);
+    synchronized (leaderCache.get(regionGroupId)) {
+      if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) {
+        leaderCache.replace(regionGroupId, leaderSample);
+      }
+    }
+  }
+
   /**
    * Invoking periodically to update the RegionRouteMap
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java
deleted file mode 100644
index 4fcb27d00d..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.heartbeat;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
-import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/** Maintain all kinds of heartbeat samples */
-public class HeartbeatSampleCache {
-
-  private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
-    ConsensusFactory.IOT_CONSENSUS.equals(ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
-
-  // Map<NodeId, INodeCache>
-  private final Map<Integer, BaseNodeCache> nodeCacheMap;
-  // Map<RegionGroupId, RegionGroupCache>
-  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
-  // Key: RegionGroupId
-  // Value: Pair<Timestamp, LeaderDataNodeId>, where
-  // the left value stands for sampling timestamp
-  // and the right value stands for the index of DataNode that leader resides.
-  private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
-
-  public HeartbeatSampleCache() {
-    this.nodeCacheMap = new ConcurrentHashMap<>();
-    this.regionGroupCacheMap = new ConcurrentHashMap<>();
-    this.leaderCache = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * Cache the latest heartbeat sample of a ConfigNode.
-   *
-   * @param nodeId the id of the ConfigNode
-   * @param sample the latest heartbeat sample
-   */
-  public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
-    nodeCacheMap.computeIfAbsent(nodeId,
-      empty -> new ConfigNodeHeartbeatCache(nodeId))
-      .cacheHeartbeatSample(sample);
-  }
-
-  /**
-   * Cache the latest heartbeat sample of a DataNode.
-   *
-   * @param nodeId the id of the DataNode
-   * @param sample the latest heartbeat sample
-   */
-  public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
-    nodeCacheMap.computeIfAbsent(nodeId,
-      empty -> new DataNodeHeartbeatCache())
-      .cacheHeartbeatSample(sample);
-  }
-
-  /**
-   * Cache the latest heartbeat sample of a RegionGroup.
-   *
-   * @param regionGroupId the id of the RegionGroup
-   * @param nodeId the id of the DataNode where specified Region resides
-   * @param sample the latest heartbeat sample
-   */
-  public void cacheRegionHeartbeatSample(TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) {
-    regionGroupCacheMap.computeIfAbsent(regionGroupId,
-      empty -> new RegionGroupCache(regionGroupId))
-      .cacheHeartbeatSample(nodeId, sample);
-  }
-
-  /**
-   * Cache the latest leader of a RegionGroup.
-   *
-   * @param regionGroupId the id of the RegionGroup
-   * @param leaderSample the latest leader of a RegionGroup
-   */
-  public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
-    if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
-      && IS_DATA_REGION_IOT_CONSENSUS) {
-      // The leadership of IoTConsensus protocol is decided by ConfigNode-leader
-      return;
-    }
-
-    leaderCache.putIfAbsent(regionGroupId, leaderSample);
-    synchronized (leaderCache.get(regionGroupId)) {
-      if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) {
-        leaderCache.replace(regionGroupId, leaderSample);
-      }
-    }
-  }
-
-  public void clear() {
-    nodeCacheMap.clear();
-    regionGroupCacheMap.clear();
-    leaderCache.clear();
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
index 056ecfd76c..b2aaaa15aa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
@@ -30,10 +30,11 @@ import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeart
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,23 +50,24 @@ public class HeartbeatService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
 
-  private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+  private static final long HEARTBEAT_INTERVAL =
+      ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
 
   private final IManager configManager;
+  private final LoadCache loadCache;
 
   /** Heartbeat executor service */
   // Monitor for leadership change
   private final Object heartbeatScheduleMonitor = new Object();
+
   private Future<?> currentHeartbeatFuture;
   private final ScheduledExecutorService heartBeatExecutor =
-    IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
   private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
 
-  private final HeartbeatSampleCache heartbeatSampleCache;
-
-  public HeartbeatService(IManager configManager) {
+  public HeartbeatService(IManager configManager, LoadCache loadCache) {
     this.configManager = configManager;
-    this.heartbeatSampleCache = new HeartbeatSampleCache();
+    this.loadCache = loadCache;
   }
 
   /** Start the heartbeat service */
@@ -73,12 +75,12 @@ public class HeartbeatService {
     synchronized (heartbeatScheduleMonitor) {
       if (currentHeartbeatFuture == null) {
         currentHeartbeatFuture =
-          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-            heartBeatExecutor,
-            this::heartbeatLoopBody,
-            0,
-            HEARTBEAT_INTERVAL,
-            TimeUnit.MILLISECONDS);
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                heartBeatExecutor,
+                this::heartbeatLoopBody,
+                0,
+                HEARTBEAT_INTERVAL,
+                TimeUnit.MILLISECONDS);
         LOGGER.info("Heartbeat service is started successfully.");
       }
     }
@@ -90,12 +92,29 @@ public class HeartbeatService {
       if (currentHeartbeatFuture != null) {
         currentHeartbeatFuture.cancel(false);
         currentHeartbeatFuture = null;
-        heartbeatSampleCache.clear();
         LOGGER.info("Heartbeat service is stopped successfully.");
       }
     }
   }
 
+  /** loop body of the heartbeat thread */
+  private void heartbeatLoopBody() {
+    // The consensusManager of configManager may not be fully initialized at this time
+    Optional.ofNullable(getConsensusManager())
+        .ifPresent(
+            consensusManager -> {
+              if (getConsensusManager().isLeader()) {
+                // Generate HeartbeatReq
+                THeartbeatReq heartbeatReq = genHeartbeatReq();
+                // Send heartbeat requests to all the registered ConfigNodes
+                pingRegisteredConfigNodes(
+                    heartbeatReq, getNodeManager().getRegisteredConfigNodes());
+                // Send heartbeat requests to all the registered DataNodes
+                pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
+              }
+            });
+  }
+
   private THeartbeatReq genHeartbeatReq() {
     /* Generate heartbeat request */
     THeartbeatReq heartbeatReq = new THeartbeatReq();
@@ -115,30 +134,13 @@ public class HeartbeatService {
     return heartbeatReq;
   }
 
-  /** loop body of the heartbeat thread */
-  private void heartbeatLoopBody() {
-    // The consensusManager of configManager may not be fully initialized at this time
-    Optional.ofNullable(getConsensusManager())
-      .ifPresent(
-        consensusManager -> {
-          if (getConsensusManager().isLeader()) {
-            // Generate HeartbeatReq
-            THeartbeatReq heartbeatReq = genHeartbeatReq();
-            // Send heartbeat requests to all the registered ConfigNodes
-            pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes());
-            // Send heartbeat requests to all the registered DataNodes
-            pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
-          }
-        });
-  }
-
   /**
    * Send heartbeat requests to all the Registered ConfigNodes
    *
    * @param registeredConfigNodes ConfigNodes that registered in cluster
    */
   private void pingRegisteredConfigNodes(
-    THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
+      THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
     // Send heartbeat requests
     for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
       if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
@@ -147,12 +149,12 @@ public class HeartbeatService {
       }
 
       ConfigNodeHeartbeatHandler handler =
-        new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), heartbeatSampleCache);
+          new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), loadCache);
       AsyncConfigNodeHeartbeatClientPool.getInstance()
-        .getConfigNodeHeartBeat(
-          configNodeLocation.getInternalEndPoint(),
-          heartbeatReq.getHeartbeatTimestamp(),
-          handler);
+          .getConfigNodeHeartBeat(
+              configNodeLocation.getInternalEndPoint(),
+              heartbeatReq.getHeartbeatTimestamp(),
+              handler);
     }
   }
 
@@ -162,20 +164,20 @@ public class HeartbeatService {
    * @param registeredDataNodes DataNodes that registered in cluster
    */
   private void pingRegisteredDataNodes(
-    THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
+      THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
     // Send heartbeat requests
     for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
       DataNodeHeartbeatHandler handler =
-        new DataNodeHeartbeatHandler(
-          dataNodeInfo.getLocation().getDataNodeId(),
-          heartbeatSampleCache,
-          configManager.getClusterQuotaManager().getDeviceNum(),
-          configManager.getClusterQuotaManager().getTimeSeriesNum(),
-          configManager.getClusterQuotaManager().getRegionDisk());
+          new DataNodeHeartbeatHandler(
+              dataNodeInfo.getLocation().getDataNodeId(),
+              loadCache,
+              configManager.getClusterQuotaManager().getDeviceNum(),
+              configManager.getClusterQuotaManager().getTimeSeriesNum(),
+              configManager.getClusterQuotaManager().getRegionDisk());
       configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
       AsyncDataNodeHeartbeatClientPool.getInstance()
-        .getDataNodeHeartBeat(
-          dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+          .getDataNodeHeartBeat(
+              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
     }
   }
 
@@ -186,5 +188,4 @@ public class HeartbeatService {
   private NodeManager getNodeManager() {
     return configManager.getNodeManager();
   }
-
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
index fb5cf7766f..adcb93a906 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.confignode.manager.load.heartbeat.region;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
 import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
index d22ea0e4d6..6a2f2db7ee 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.manager.load.statistics;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -32,8 +33,14 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class RegionGroupStatistics {
 
-  private RegionGroupStatus regionGroupStatus;
+  // The DataNodeId where the leader of current RegionGroup resides
+  private volatile int leaderId = -1;
+  // Indicate the routing priority of read/write requests for current RegionGroup.
+  // The replica with higher sorting order have higher priority.
+  // TODO: Might be split into readRouteMap and writeRouteMap in the future
+  private TRegionReplicaSet regionPriority;
 
+  private volatile RegionGroupStatus regionGroupStatus;
   private final Map<Integer, RegionStatistics> regionStatisticsMap;
 
   public RegionGroupStatistics() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
index 79b8ad1869..20c37536f7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
@@ -19,7 +19,196 @@
 
 package org.apache.iotdb.confignode.manager.load.statistics;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadCache;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import com.google.common.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 public class StatisticsService {
 
-  
+  private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class);
+
+  private static final long HEARTBEAT_INTERVAL =
+      ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+
+  private final IManager configManager;
+  private final RouteBalancer routeBalancer;
+  private final LoadCache loadCache;
+  private final EventBus eventBus;
+
+  public StatisticsService(IManager configManager, LoadCache loadCache, EventBus eventBus) {
+    this.configManager = configManager;
+    this.routeBalancer = configManager.getLoadManager().getRouteBalancer();
+    this.loadCache = loadCache;
+    this.eventBus = eventBus;
+  }
+
+  /** Load statistics executor service */
+  private final Object statisticsScheduleMonitor = new Object();
+
+  private Future<?> currentLoadStatisticsFuture;
+  private final ScheduledExecutorService loadStatisticsExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
+
+  /** Start the load statistics service */
+  public void startLoadStatisticsService() {
+    synchronized (statisticsScheduleMonitor) {
+      if (currentLoadStatisticsFuture == null) {
+        currentLoadStatisticsFuture =
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                loadStatisticsExecutor,
+                this::updateLoadStatistics,
+                0,
+                HEARTBEAT_INTERVAL,
+                TimeUnit.MILLISECONDS);
+        LOGGER.info("LoadStatistics service is started successfully.");
+      }
+    }
+  }
+
+  /** Stop the load statistics service */
+  public void stopLoadStatisticsService() {
+    synchronized (statisticsScheduleMonitor) {
+      if (currentLoadStatisticsFuture != null) {
+        currentLoadStatisticsFuture.cancel(false);
+        currentLoadStatisticsFuture = null;
+        LOGGER.info("LoadStatistics service is stopped successfully.");
+      }
+    }
+  }
+
+  private void updateLoadStatistics() {
+    // Broadcast the RegionRouteMap if some LoadStatistics has changed
+    boolean isNeedBroadcast = false;
+
+    // Update NodeStatistics:
+    // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
+    // means the previous NodeStatistics
+    Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
+        loadCache.updateNodeStatistics();
+    if (!differentNodeStatisticsMap.isEmpty()) {
+      isNeedBroadcast = true;
+      recordNodeStatistics(differentNodeStatisticsMap);
+      eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
+    }
+
+    // Update RegionGroupStatistics
+    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
+        loadCache.updateRegionGroupStatistics();
+    if (!differentRegionGroupStatisticsMap.isEmpty()) {
+      isNeedBroadcast = true;
+      recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
+    }
+
+    // Update RegionRouteMap
+    if (routeBalancer.updateRegionRouteMap()) {
+      isNeedBroadcast = true;
+      recordRegionRouteMap(routeBalancer.getRegionRouteMap());
+    }
+
+    if (isNeedBroadcast) {
+      broadcastLatestRegionRouteMap();
+    }
+  }
+
+  private void recordNodeStatistics(
+      Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
+    LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
+    for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry :
+        differentNodeStatisticsMap.entrySet()) {
+      LOGGER.info(
+          "[UpdateLoadStatistics]\t {}={}",
+          "nodeId{" + nodeCacheEntry.getKey() + "}",
+          nodeCacheEntry.getValue().left);
+    }
+  }
+
+  private void recordRegionGroupStatistics(
+      Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
+    LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
+    for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
+        differentRegionGroupStatisticsMap.entrySet()) {
+      LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
+      LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
+      for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
+          regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
+        LOGGER.info(
+            "[UpdateLoadStatistics]\t dataNodeId{}={}",
+            regionStatisticsEntry.getKey(),
+            regionStatisticsEntry.getValue());
+      }
+    }
+  }
+
+  private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
+    LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
+    for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
+        regionRouteMap.getRegionLeaderMap().entrySet()) {
+      LOGGER.info(
+          "[UpdateLoadStatistics]\t {}={}",
+          regionLeaderEntry.getKey(),
+          regionLeaderEntry.getValue());
+    }
+
+    LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
+    for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
+        regionRouteMap.getRegionPriorityMap().entrySet()) {
+      LOGGER.info(
+          "[UpdateLoadStatistics]\t {}={}",
+          regionPriorityEntry.getKey(),
+          regionPriorityEntry.getValue().getDataNodeLocations().stream()
+              .map(TDataNodeLocation::getDataNodeId)
+              .collect(Collectors.toList()));
+    }
+  }
+
+  public void broadcastLatestRegionRouteMap() {
+    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
+        routeBalancer.getLatestRegionPriorityMap();
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
+    configManager
+        .getNodeManager()
+        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
+        .forEach(
+            onlineDataNode ->
+                dataNodeLocationMap.put(
+                    onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
+
+    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
+    long broadcastTime = System.currentTimeMillis();
+
+    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+            dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 62ce597f2f..60e0134320 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -666,7 +666,6 @@ public class NodeManager {
     }
   }
 
-
   public Map<Integer, BaseNodeCache> getNodeCacheMap() {
     return nodeCacheMap;
   }
@@ -803,36 +802,6 @@ public class NodeManager {
     return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
   }
 
-  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
-  public void initNodeHeartbeatCache() {
-    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
-    nodeCacheMap.clear();
-
-    // Init ConfigNodeHeartbeatCache
-    getRegisteredConfigNodes()
-        .forEach(
-            configNodeLocation -> {
-              if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) {
-                nodeCacheMap.put(
-                    configNodeLocation.getConfigNodeId(),
-                    new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()));
-              }
-            });
-    // Force set itself and never update
-    nodeCacheMap.put(
-        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
-        new ConfigNodeHeartbeatCache(
-            CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
-
-    // Init DataNodeHeartbeatCache
-    getRegisteredDataNodes()
-        .forEach(
-            dataNodeConfiguration ->
-                nodeCacheMap.put(
-                    dataNodeConfiguration.getLocation().getDataNodeId(),
-                    new DataNodeHeartbeatCache()));
-  }
-
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index a754ccf396..fdec1487e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -68,7 +68,6 @@ import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.ProcedureManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -1149,17 +1148,6 @@ public class PartitionManager {
         : RegionGroupStatus.Disabled;
   }
 
-  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
-  public void initRegionGroupHeartbeatCache() {
-    regionGroupCacheMap.clear();
-    getAllReplicaSets()
-        .forEach(
-            regionReplicaSet ->
-                regionGroupCacheMap.put(
-                    regionReplicaSet.getRegionId(),
-                    new RegionGroupCache(regionReplicaSet.getRegionId())));
-  }
-
   public void getSchemaRegionIds(
       List<String> databases, Map<String, List<Integer>> schemaRegionIds) {
     partitionInfo.getSchemaRegionIds(databases, schemaRegionIds);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 5faefbcd4f..06aef7c2f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -48,11 +48,11 @@ import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
index b0b97b2b0c..dff0105f73 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
@@ -19,9 +19,9 @@
 package org.apache.iotdb.confignode.persistence.partition.statistics;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
 import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;


[iotdb] 03/03: ready for test

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94bed63697747879440b91679454da330f4c8dd0
Author: YongzaoDan <53...@qq.com>
AuthorDate: Thu Apr 13 22:05:14 2023 +0800

    ready for test
---
 .../iotdb/confignode/manager/ConfigManager.java    |  25 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  79 +++--
 .../confignode/manager/RetryFailedTasksThread.java |  21 +-
 .../iotdb/confignode/manager/load/LoadCache.java   | 357 ++++++++++++++++++---
 .../iotdb/confignode/manager/load/LoadManager.java | 183 +++++++++++
 .../manager/load/balancer/RegionBalancer.java      |   7 +-
 .../manager/load/balancer/RouteBalancer.java       |   7 +-
 .../manager/load/heartbeat/HeartbeatService.java   |   1 +
 .../manager/load/heartbeat/node/BaseNodeCache.java |   9 +-
 .../heartbeat/node/ConfigNodeHeartbeatCache.java   |  10 +-
 .../heartbeat/node/DataNodeHeartbeatCache.java     |   4 +-
 .../iotdb/confignode/manager/node/NodeManager.java | 149 +--------
 .../manager/partition/PartitionManager.java        | 146 +++------
 .../manager/partition/PartitionMetrics.java        |  18 +-
 .../manager/partition/RegionGroupStatus.java       |  25 +-
 .../confignode/persistence/node/NodeInfo.java      |  47 ++-
 .../partition/DatabasePartitionTable.java          |  36 ++-
 .../persistence/partition/PartitionInfo.java       |  50 ++-
 .../persistence/partition/RegionGroup.java         |  13 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  37 +--
 .../procedure/env/DataNodeRemoveHandler.java       |  15 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   2 +-
 .../impl/statemachine/RegionMigrateProcedure.java  |   3 +
 .../router/priority/GreedyPriorityTest.java        |   2 +-
 .../priority/LeaderPriorityBalancerTest.java       |   2 +-
 .../confignode/manager/node/NodeCacheTest.java     |   4 +-
 26 files changed, 835 insertions(+), 417 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index dab5101d1c..4a3d65565e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -398,10 +398,11 @@ public class ConfigManager implements IManager {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Force updating the target DataNode's status to Unknown
-      getNodeManager()
-          .getNodeCacheMap()
-          .get(dataNodeLocation.getDataNodeId())
-          .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      getLoadManager()
+          .forceUpdateNodeCache(
+              NodeType.DataNode,
+              dataNodeLocation.getDataNodeId(),
+              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
       LOGGER.info(
           "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown",
           dataNodeLocation.getDataNodeId());
@@ -442,12 +443,7 @@ public class ConfigManager implements IManager {
               .map(TDataNodeConfiguration::getLocation)
               .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
               .collect(Collectors.toList());
-      Map<Integer, String> nodeStatus = new HashMap<>();
-      getNodeManager()
-          .getNodeCacheMap()
-          .forEach(
-              (nodeId, heartbeatCache) ->
-                  nodeStatus.put(nodeId, heartbeatCache.getNodeStatusWithReason()));
+      Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
       return new TShowClusterResp(status, configNodeLocations, dataNodeInfoLocations, nodeStatus);
     } else {
       return new TShowClusterResp(status, new ArrayList<>(), new ArrayList<>(), new HashMap<>());
@@ -1146,10 +1142,11 @@ public class ConfigManager implements IManager {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Force updating the target ConfigNode's status to Unknown
-      getNodeManager()
-          .getNodeCacheMap()
-          .get(configNodeLocation.getConfigNodeId())
-          .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      getLoadManager()
+          .forceUpdateNodeCache(
+              NodeType.ConfigNode,
+              configNodeLocation.getConfigNodeId(),
+              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
       LOGGER.info(
           "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown",
           configNodeLocation.getConfigNodeId());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 5df812d4fb..5379ff8ee1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
 import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
@@ -94,7 +93,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -347,13 +345,33 @@ public class ProcedureManager {
   }
 
   public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
-    // TODO: Whether to guarantee the check high consistency, i.e, use consensus read to check
-    Map<TConsensusGroupId, RegionGroupCache> regionReplicaMap =
-        configManager.getPartitionManager().getRegionGroupCacheMap();
-    Optional<TConsensusGroupId> regionId =
-        regionReplicaMap.keySet().stream()
-            .filter(id -> id.getId() == migrateRegionReq.getRegionId())
-            .findAny();
+    TConsensusGroupId regionGroupId;
+    if (configManager
+        .getPartitionManager()
+        .isRegionGroupExists(
+            new TConsensusGroupId(
+                TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId()))) {
+      regionGroupId =
+          new TConsensusGroupId(TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId());
+    } else if (configManager
+        .getPartitionManager()
+        .isRegionGroupExists(
+            new TConsensusGroupId(
+                TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId()))) {
+      regionGroupId =
+          new TConsensusGroupId(TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId());
+    } else {
+      LOGGER.warn(
+          "Submit RegionMigrateProcedure failed, because RegionGroup: {} doesn't exist",
+          migrateRegionReq.getRegionId());
+      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Submit RegionMigrateProcedure failed, because RegionGroup: %s doesn't exist",
+              migrateRegionReq.getRegionId()));
+      return status;
+    }
+
     TDataNodeLocation originalDataNode =
         configManager
             .getNodeManager()
@@ -364,18 +382,7 @@ public class ProcedureManager {
             .getNodeManager()
             .getRegisteredDataNode(migrateRegionReq.getToId())
             .getLocation();
-    if (!regionId.isPresent()) {
-      LOGGER.warn(
-          "Submit RegionMigrateProcedure failed, because no Region {}",
-          migrateRegionReq.getRegionId());
-      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage(
-          "Submit RegionMigrateProcedure failed, because no region Group "
-              + migrateRegionReq.getRegionId());
-      return status;
-    }
-    Set<Integer> dataNodesInRegion =
-        regionReplicaMap.get(regionId.get()).getStatistics().getRegionStatisticsMap().keySet();
+
     if (originalDataNode == null) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because no original DataNode {}",
@@ -394,7 +401,9 @@ public class ProcedureManager {
           "Submit RegionMigrateProcedure failed, because no target DataNode "
               + migrateRegionReq.getToId());
       return status;
-    } else if (!dataNodesInRegion.contains(migrateRegionReq.getFromId())) {
+    } else if (configManager.getPartitionManager()
+        .getAllReplicaSets(originalDataNode.getDataNodeId()).stream()
+        .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}",
           migrateRegionReq.getFromId(),
@@ -406,7 +415,9 @@ public class ProcedureManager {
               + " doesn't contain Region "
               + migrateRegionReq.getRegionId());
       return status;
-    } else if (dataNodesInRegion.contains(migrateRegionReq.getToId())) {
+    } else if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId())
+        .stream()
+        .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}",
           migrateRegionReq.getToId(),
@@ -426,10 +437,8 @@ public class ProcedureManager {
             .map(TDataNodeConfiguration::getLocation)
             .map(TDataNodeLocation::getDataNodeId)
             .collect(Collectors.toSet());
-    if (configManager
-        .getNodeManager()
-        .getNodeStatusByNodeId(migrateRegionReq.getFromId())
-        .equals(NodeStatus.Unknown)) {
+    if (NodeStatus.Unknown.equals(
+        configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the sourceDataNode {} is Unknown.",
           migrateRegionReq.getFromId());
@@ -440,18 +449,8 @@ public class ProcedureManager {
               + " is Unknown.");
       return status;
     }
-    dataNodesInRegion.retainAll(aliveDataNodes);
-    if (dataNodesInRegion.isEmpty()) {
-      LOGGER.warn(
-          "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group {} is unavailable.",
-          migrateRegionReq.getRegionId());
-      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage(
-          "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group "
-              + migrateRegionReq.getRegionId()
-              + " are unavailable.");
-      return status;
-    } else if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
+
+    if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the destDataNode {} is ReadOnly or Unknown.",
           migrateRegionReq.getToId());
@@ -463,7 +462,7 @@ public class ProcedureManager {
       return status;
     }
     this.executor.submitProcedure(
-        new RegionMigrateProcedure(regionId.get(), originalDataNode, destDataNode));
+        new RegionMigrateProcedure(regionGroupId, originalDataNode, destDataNode));
     LOGGER.info(
         "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
         migrateRegionReq.getRegionId(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 865b8b59c2..287cda6eb1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -63,6 +63,7 @@ public class RetryFailedTasksThread {
   private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
   private final IManager configManager;
   private final NodeManager nodeManager;
+  private final LoadManager loadManager;
   private final ScheduledExecutorService retryFailTasksExecutor =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-RetryFailedTasks-Service");
   private final Object scheduleMonitor = new Object();
@@ -78,6 +79,7 @@ public class RetryFailedTasksThread {
   public RetryFailedTasksThread(IManager configManager) {
     this.configManager = configManager;
     this.nodeManager = configManager.getNodeManager();
+    this.loadManager = configManager.getLoadManager();
     this.oldUnknownNodes = new HashSet<>();
   }
 
@@ -131,15 +133,12 @@ public class RetryFailedTasksThread {
         .forEach(
             DataNodeConfiguration -> {
               TDataNodeLocation dataNodeLocation = DataNodeConfiguration.getLocation();
-              BaseNodeCache newestNodeInformation =
-                  nodeManager.getNodeCacheMap().get(dataNodeLocation.dataNodeId);
-              if (newestNodeInformation != null) {
-                if (newestNodeInformation.getNodeStatus() == NodeStatus.Running) {
-                  oldUnknownNodes.remove(dataNodeLocation);
-                } else if (!oldUnknownNodes.contains(dataNodeLocation)
-                    && newestNodeInformation.getNodeStatus() == NodeStatus.Unknown) {
-                  newUnknownNodes.add(dataNodeLocation);
-                }
+              NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeLocation.getDataNodeId());
+              if (nodeStatus == NodeStatus.Running) {
+                oldUnknownNodes.remove(dataNodeLocation);
+              } else if (!oldUnknownNodes.contains(dataNodeLocation)
+                  && nodeStatus == NodeStatus.Unknown) {
+                newUnknownNodes.add(dataNodeLocation);
               }
             });
 
@@ -163,7 +162,7 @@ public class RetryFailedTasksThread {
   private void syncDetectTask() {
     for (Map.Entry<Integer, Queue<TOperatePipeOnDataNodeReq>> entry : messageMap.entrySet()) {
       int dataNodeId = entry.getKey();
-      if (NodeStatus.Running.equals(nodeManager.getNodeStatusByNodeId(dataNodeId))) {
+      if (NodeStatus.Running.equals(loadManager.getNodeStatus(dataNodeId))) {
         final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
         dataNodeLocationMap.put(
             dataNodeId, nodeManager.getRegisteredDataNodeLocations().get(dataNodeId));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
index 03f002899c..0daa3846f5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadCache.java
@@ -21,9 +21,12 @@ package org.apache.iotdb.confignode.manager.load;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
@@ -33,20 +36,21 @@ import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCach
 import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
 import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
-import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /** Maintain all kinds of heartbeat samples */
 public class LoadCache {
 
-  private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
-      ConsensusFactory.IOT_CONSENSUS.equals(
-          ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
-
   // Map<NodeId, INodeCache>
   private final Map<Integer, BaseNodeCache> nodeCacheMap;
   // Map<RegionGroupId, RegionGroupCache>
@@ -57,6 +61,58 @@ public class LoadCache {
     this.regionGroupCacheMap = new ConcurrentHashMap<>();
   }
 
+  public void initHeartbeatCache(IManager configManager) {
+    initNodeHeartbeatCache(
+        configManager.getNodeManager().getRegisteredConfigNodes(),
+        configManager.getNodeManager().getRegisteredDataNodes());
+    initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+  }
+
+  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
+  private void initNodeHeartbeatCache(
+      List<TConfigNodeLocation> registeredConfigNodes,
+      List<TDataNodeConfiguration> registeredDataNodes) {
+
+    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
+    nodeCacheMap.clear();
+
+    // Init ConfigNodeHeartbeatCache
+    registeredConfigNodes.forEach(
+        configNodeLocation -> {
+          int configNodeId = configNodeLocation.getConfigNodeId();
+          if (configNodeId != CURRENT_NODE_ID) {
+            nodeCacheMap.put(configNodeId, new ConfigNodeHeartbeatCache(configNodeId));
+          }
+        });
+    // Force set itself and never update
+    nodeCacheMap.put(
+        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
+        new ConfigNodeHeartbeatCache(
+            CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
+
+    // Init DataNodeHeartbeatCache
+    registeredDataNodes.forEach(
+        dataNodeConfiguration -> {
+          int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
+          nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId));
+        });
+  }
+
+  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
+  private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) {
+    regionGroupCacheMap.clear();
+    regionReplicaSets.forEach(
+        regionReplicaSet ->
+            regionGroupCacheMap.put(
+                regionReplicaSet.getRegionId(),
+                new RegionGroupCache(regionReplicaSet.getRegionId())));
+  }
+
+  public void clearHeartbeatCache() {
+    nodeCacheMap.clear();
+    regionGroupCacheMap.clear();
+  }
+
   /**
    * Cache the latest heartbeat sample of a ConfigNode.
    *
@@ -77,7 +133,7 @@ public class LoadCache {
    */
   public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
     nodeCacheMap
-        .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache())
+        .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
         .cacheHeartbeatSample(sample);
   }
 
@@ -133,54 +189,263 @@ public class LoadCache {
     return differentRegionGroupStatisticsMap;
   }
 
-  public void initHeartbeatCache(IManager configManager) {
-    initNodeHeartbeatCache(
-        configManager.getNodeManager().getRegisteredConfigNodes(),
-        configManager.getNodeManager().getRegisteredDataNodes());
-    initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+  /**
+   * Safely get NodeStatus by NodeId
+   *
+   * @param nodeId The specified NodeId
+   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+   */
+  public NodeStatus getNodeStatus(int nodeId) {
+    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+    return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
   }
 
-  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
-  private void initNodeHeartbeatCache(
-      List<TConfigNodeLocation> registeredConfigNodes,
-      List<TDataNodeConfiguration> registeredDataNodes) {
-    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
-    nodeCacheMap.clear();
+  /**
+   * Safely get the specified Node's current status with reason
+   *
+   * @param nodeId The specified NodeId
+   * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
+   */
+  public String getNodeStatusWithReason(int nodeId) {
+    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+    return nodeCache == null
+        ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
+        : nodeCache.getNodeStatusWithReason();
+  }
 
-    // Init ConfigNodeHeartbeatCache
-    registeredConfigNodes.forEach(
-        configNodeLocation -> {
-          if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) {
-            nodeCacheMap.put(
-                configNodeLocation.getConfigNodeId(),
-                new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()));
+  /**
+   * Get all Node's current status with reason
+   *
+   * @return Map<NodeId, NodeStatus with reason>
+   */
+  public Map<Integer, String> getNodeStatusWithReason() {
+    return nodeCacheMap.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getNodeStatusWithReason()));
+  }
+
+  /**
+   * Filter ConfigNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
+   */
+  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+    return nodeCacheMap.entrySet().stream()
+        .filter(
+            nodeCacheEntry ->
+                nodeCacheEntry.getValue() instanceof ConfigNodeHeartbeatCache
+                    && Arrays.stream(status)
+                        .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Filter DataNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
+   */
+  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+    return nodeCacheMap.entrySet().stream()
+        .filter(
+            nodeCacheEntry ->
+                nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache
+                    && Arrays.stream(status)
+                        .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the free disk space of the specified DataNode
+   *
+   * @param dataNodeId The index of the specified DataNode
+   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
+   */
+  public double getFreeDiskSpace(int dataNodeId) {
+    DataNodeHeartbeatCache dataNodeHeartbeatCache =
+        (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
+    return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace();
+  }
+
+  /**
+   * Get the loadScore of each DataNode
+   *
+   * @return Map<DataNodeId, loadScore>
+   */
+  public Map<Integer, Long> getAllDataNodeLoadScores() {
+    Map<Integer, Long> result = new ConcurrentHashMap<>();
+    nodeCacheMap.forEach(
+        (dataNodeId, heartbeatCache) -> {
+          if (heartbeatCache instanceof DataNodeHeartbeatCache) {
+            result.put(dataNodeId, heartbeatCache.getLoadScore());
           }
         });
-    // Force set itself and never update
-    nodeCacheMap.put(
-        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
-        new ConfigNodeHeartbeatCache(
-            CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
+    return result;
+  }
 
-    // Init DataNodeHeartbeatCache
-    registeredDataNodes.forEach(
-        dataNodeConfiguration ->
-            nodeCacheMap.put(
-                dataNodeConfiguration.getLocation().getDataNodeId(), new DataNodeHeartbeatCache()));
+  /**
+   * Get the lowest loadScore DataNode
+   *
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode() {
+    return nodeCacheMap.entrySet().stream()
+        .filter(nodeCacheEntry -> nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache)
+        .min(Comparator.comparingLong(nodeCacheEntry -> nodeCacheEntry.getValue().getLoadScore()))
+        .map(Map.Entry::getKey)
+        .orElse(-1);
   }
 
-  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
-  private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) {
-    regionGroupCacheMap.clear();
-    regionReplicaSets.forEach(
-        regionReplicaSet ->
-            regionGroupCacheMap.put(
-                regionReplicaSet.getRegionId(),
-                new RegionGroupCache(regionReplicaSet.getRegionId())));
+  /**
+   * Get the lowest loadScore DataNode from the specified DataNodes
+   *
+   * @param dataNodeIds The specified DataNodes
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+    return dataNodeIds.stream()
+        .map(nodeCacheMap::get)
+        .filter(Objects::nonNull)
+        .min(Comparator.comparingLong(BaseNodeCache::getLoadScore))
+        .map(BaseNodeCache::getNodeId)
+        .orElse(-1);
   }
 
-  public void clearHeartbeatCache() {
-    nodeCacheMap.clear();
-    regionGroupCacheMap.clear();
+  /**
+   * Force update the specified Node's cache
+   *
+   * @param nodeType Specified NodeType
+   * @param nodeId Specified NodeId
+   * @param heartbeatSample Specified NodeHeartbeatSample
+   */
+  public void forceUpdateNodeCache(
+      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+    switch (nodeType) {
+      case ConfigNode:
+        nodeCacheMap
+            .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
+            .forceUpdate(heartbeatSample);
+        break;
+      case DataNode:
+        nodeCacheMap
+            .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
+            .forceUpdate(heartbeatSample);
+        break;
+    }
+  }
+
+  /** Remove the specified Node's cache */
+  public void removeNodeCache(int nodeId) {
+    nodeCacheMap.remove(nodeId);
+  }
+
+  /**
+   * Safely get RegionStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @param dataNodeId Specified RegionReplicaId
+   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+   */
+  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
+        : RegionStatus.Unknown;
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
+        : RegionGroupStatus.Disabled;
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupIds Specified RegionGroupIds
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+      List<TConsensusGroupId> consensusGroupIds) {
+    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new ConcurrentHashMap<>();
+    for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
+      regionGroupStatusMap.put(consensusGroupId, getRegionGroupStatus(consensusGroupId));
+    }
+    return regionGroupStatusMap;
+  }
+
+  /**
+   * Filter the RegionGroups through the RegionGroupStatus
+   *
+   * @param status The specified RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
+   */
+  public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+    return regionGroupCacheMap.entrySet().stream()
+        .filter(
+            regionGroupCacheEntry ->
+                Arrays.stream(status)
+                    .anyMatch(
+                        s ->
+                            s.equals(
+                                regionGroupCacheEntry
+                                    .getValue()
+                                    .getStatistics()
+                                    .getRegionGroupStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Count the number of cluster Regions with specified RegionStatus
+   *
+   * @param type The specified RegionGroupType
+   * @param status The specified statues
+   * @return The number of cluster Regions with specified RegionStatus
+   */
+  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
+    AtomicInteger result = new AtomicInteger(0);
+    regionGroupCacheMap.forEach(
+        (regionGroupId, regionGroupCache) -> {
+          if (type.equals(regionGroupId.getType())) {
+            regionGroupCache
+                .getStatistics()
+                .getRegionStatisticsMap()
+                .values()
+                .forEach(
+                    regionStatistics -> {
+                      if (Arrays.stream(status)
+                          .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) {
+                        result.getAndIncrement();
+                      }
+                    });
+          }
+        });
+    return result.get();
+  }
+
+  /**
+   * Force update the specified RegionGroup's cache
+   *
+   * @param regionGroupId Specified RegionGroupId
+   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+   */
+  public void forceUpdateRegionGroupCache(
+      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
+    regionGroupCacheMap
+        .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+        .forceUpdate(heartbeatSampleMap);
+  }
+
+  /** Remove the specified RegionGroup's cache */
+  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+    regionGroupCacheMap.remove(consensusGroupId);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 15dc22c0f5..f66b805703 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
@@ -34,7 +37,10 @@ import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 
 import com.google.common.eventbus.AsyncEventBus;
@@ -162,6 +168,10 @@ public class LoadManager {
     return routeBalancer.getLatestRegionPriorityMap();
   }
 
+  public void broadcastLatestRegionRouteMap() {
+    statisticsService.broadcastLatestRegionRouteMap();
+  }
+
   public void startLoadServices() {
     loadCache.initHeartbeatCache(configManager);
     routeBalancer.initRegionRouteMap();
@@ -178,4 +188,177 @@ public class LoadManager {
   public RouteBalancer getRouteBalancer() {
     return routeBalancer;
   }
+
+  /**
+   * Safely get NodeStatus by NodeId
+   *
+   * @param nodeId The specified NodeId
+   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+   */
+  public NodeStatus getNodeStatus(int nodeId) {
+    return loadCache.getNodeStatus(nodeId);
+  }
+
+  /**
+   * Safely get the specified Node's current status with reason
+   *
+   * @param nodeId The specified NodeId
+   * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
+   */
+  public String getNodeStatusWithReason(int nodeId) {
+    return loadCache.getNodeStatusWithReason(nodeId);
+  }
+
+  /**
+   * Get all Node's current status with reason
+   *
+   * @return Map<NodeId, NodeStatus with reason>
+   */
+  public Map<Integer, String> getNodeStatusWithReason() {
+    return loadCache.getNodeStatusWithReason();
+  }
+
+  /**
+   * Filter ConfigNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
+   */
+  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+    return loadCache.filterConfigNodeThroughStatus(status);
+  }
+
+  /**
+   * Filter DataNodes through the specified NodeStatus
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
+   */
+  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+    return loadCache.filterDataNodeThroughStatus(status);
+  }
+
+  /**
+   * Get the free disk space of the specified DataNode
+   *
+   * @param dataNodeId The index of the specified DataNode
+   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
+   */
+  public double getFreeDiskSpace(int dataNodeId) {
+    return loadCache.getFreeDiskSpace(dataNodeId);
+  }
+
+  /**
+   * Get the loadScore of each DataNode
+   *
+   * @return Map<DataNodeId, loadScore>
+   */
+  public Map<Integer, Long> getAllDataNodeLoadScores() {
+    return loadCache.getAllDataNodeLoadScores();
+  }
+
+  /**
+   * Get the lowest loadScore DataNode
+   *
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode() {
+    return loadCache.getLowestLoadDataNode();
+  }
+
+  /**
+   * Get the lowest loadScore DataNode from the specified DataNodes
+   *
+   * @param dataNodeIds The specified DataNodes
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+    return loadCache.getLowestLoadDataNode(dataNodeIds);
+  }
+
+  /**
+   * Force update the specified Node's cache
+   *
+   * @param nodeType Specified NodeType
+   * @param nodeId Specified NodeId
+   * @param heartbeatSample Specified NodeHeartbeatSample
+   */
+  public void forceUpdateNodeCache(
+      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+    loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample);
+  }
+
+  /** Remove the specified Node's cache */
+  public void removeNodeCache(int nodeId) {
+    loadCache.removeNodeCache(nodeId);
+  }
+
+  /**
+   * Safely get RegionStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @param dataNodeId Specified RegionReplicaId
+   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+   */
+  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+    return loadCache.getRegionStatus(consensusGroupId, dataNodeId);
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+    return loadCache.getRegionGroupStatus(consensusGroupId);
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupIds Specified RegionGroupIds
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+      List<TConsensusGroupId> consensusGroupIds) {
+    return loadCache.getRegionGroupStatus(consensusGroupIds);
+  }
+
+  /**
+   * Filter the RegionGroups through the RegionGroupStatus
+   *
+   * @param status The specified RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
+   */
+  public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+    return loadCache.filterRegionGroupThroughStatus(status);
+  }
+
+  /**
+   * Count the number of cluster Regions with specified RegionStatus
+   *
+   * @param type The specified RegionGroupType
+   * @param status The specified statues
+   * @return The number of cluster Regions with specified RegionStatus
+   */
+  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
+    return loadCache.countRegionWithSpecifiedStatus(type, status);
+  }
+
+  /**
+   * Force update the specified RegionGroup's cache
+   *
+   * @param regionGroupId Specified RegionGroupId
+   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+   */
+  public void forceUpdateRegionGroupCache(
+      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
+    loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
+  }
+
+  /** Remove the specified RegionGroup's cache */
+  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+    loadCache.removeRegionGroupCache(consensusGroupId);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 882a5eeebe..f6d1524df0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
@@ -111,7 +112,7 @@ public class RegionBalancer {
             dataNodeConfiguration -> {
               int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
               availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
-              freeDiskSpaceMap.put(dataNodeId, getNodeManager().getFreeDiskSpace(dataNodeId));
+              freeDiskSpaceMap.put(dataNodeId, getLoadManager().getFreeDiskSpace(dataNodeId));
             });
 
         // Generate allocation plan
@@ -145,6 +146,10 @@ public class RegionBalancer {
     return configManager.getPartitionManager();
   }
 
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
+  }
+
   public enum RegionGroupAllocatePolicy {
     COPY_SET,
     GREEDY
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 4a27368459..a87cb051ba 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
@@ -193,7 +194,7 @@ public class RouteBalancer {
 
   private boolean updateRegionPriorityMap() {
     Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap();
-    Map<Integer, Long> dataNodeLoadScoreMap = getNodeManager().getAllLoadScores();
+    Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores();
 
     // Balancing region priority in each SchemaRegionGroup
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap =
@@ -412,4 +413,8 @@ public class RouteBalancer {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
index b2aaaa15aa..1ad268c5bd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
@@ -171,6 +171,7 @@ public class HeartbeatService {
           new DataNodeHeartbeatHandler(
               dataNodeInfo.getLocation().getDataNodeId(),
               loadCache,
+              configManager.getLoadManager().getRouteBalancer(),
               configManager.getClusterQuotaManager().getDeviceNum(),
               configManager.getClusterQuotaManager().getTimeSeriesNum(),
               configManager.getClusterQuotaManager().getRegionDisk());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
index 392ae24b6e..d55bf144d9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
@@ -32,6 +32,8 @@ public abstract class BaseNodeCache {
   // Max heartbeat cache samples store size
   public static final int MAXIMUM_WINDOW_SIZE = 100;
 
+  protected final int nodeId;
+
   // SlidingWindow stores the heartbeat sample data
   protected final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
 
@@ -42,7 +44,8 @@ public abstract class BaseNodeCache {
   protected volatile NodeStatistics currentStatistics;
 
   /** Constructor for NodeCache with default NodeStatistics */
-  protected BaseNodeCache() {
+  protected BaseNodeCache(int nodeId) {
+    this.nodeId = nodeId;
     this.previousStatistics = NodeStatistics.generateDefaultNodeStatistics();
     this.currentStatistics = NodeStatistics.generateDefaultNodeStatistics();
   }
@@ -108,6 +111,10 @@ public abstract class BaseNodeCache {
    */
   protected abstract void updateCurrentStatistics();
 
+  public int getNodeId() {
+    return nodeId;
+  }
+
   /**
    * TODO: The loadScore of each Node will be changed to Double
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
index afd9d3e197..0bb831c0dd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
@@ -31,18 +31,14 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
   public static final NodeStatistics CURRENT_NODE_STATISTICS =
       new NodeStatistics(0, NodeStatus.Running, null);
 
-  private final int configNodeId;
-
   /** Constructor for create ConfigNodeHeartbeatCache with default NodeStatistics */
   public ConfigNodeHeartbeatCache(int configNodeId) {
-    super();
-    this.configNodeId = configNodeId;
+    super(configNodeId);
   }
 
   /** Constructor only for ConfigNode-leader */
   public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) {
-    super();
-    this.configNodeId = configNodeId;
+    super(configNodeId);
     this.previousStatistics = statistics;
     this.currentStatistics = statistics;
   }
@@ -50,7 +46,7 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
   @Override
   protected void updateCurrentStatistics() {
     // Skip itself
-    if (configNodeId == CURRENT_NODE_ID) {
+    if (nodeId == CURRENT_NODE_ID) {
       return;
     }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
index cb20836120..aa36677983 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
@@ -28,8 +28,8 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
   private volatile TLoadSample latestLoadSample;
 
   /** Constructor for create DataNodeHeartbeatCache with default NodeStatistics */
-  public DataNodeHeartbeatCache() {
-    super();
+  public DataNodeHeartbeatCache(int dataNodeId) {
+    super(dataNodeId);
     this.latestLoadSample = new TLoadSample();
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 60e0134320..1482231c87 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.manager.ClusterQuotaManager;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.IManager;
@@ -54,9 +53,7 @@ import org.apache.iotdb.confignode.manager.TriggerManager;
 import org.apache.iotdb.confignode.manager.UDFManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -82,19 +79,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 /** NodeManager manages cluster node addition and removal requests */
 public class NodeManager {
@@ -109,13 +102,10 @@ public class NodeManager {
 
   private final ReentrantLock removeConfigNodeLock;
 
-  private final Random random;
-
   public NodeManager(IManager configManager, NodeInfo nodeInfo) {
     this.configManager = configManager;
     this.nodeInfo = nodeInfo;
     this.removeConfigNodeLock = new ReentrantLock();
-    this.random = new Random(System.currentTimeMillis());
   }
 
   /**
@@ -365,15 +355,6 @@ public class NodeManager {
     return nodeInfo.getRegisteredDataNodeCount();
   }
 
-  /**
-   * Only leader use this interface
-   *
-   * @return The number of total cpu cores in online DataNodes
-   */
-  public int getTotalCpuCoreCount() {
-    return nodeInfo.getTotalCpuCoreCount();
-  }
-
   /**
    * Only leader use this interface
    *
@@ -417,7 +398,7 @@ public class NodeManager {
             TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
             int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
             dataNodeInfo.setDataNodeId(dataNodeId);
-            dataNodeInfo.setStatus(getNodeStatusWithReason(dataNodeId));
+            dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
             dataNodeInfo.setRpcAddresss(
                 registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
             dataNodeInfo.setRpcPort(
@@ -481,7 +462,7 @@ public class NodeManager {
             TConfigNodeInfo info = new TConfigNodeInfo();
             int configNodeId = configNodeLocation.getConfigNodeId();
             info.setConfigNodeId(configNodeId);
-            info.setStatus(getNodeStatusWithReason(configNodeId));
+            info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
             info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
             info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
             info.setRoleType(
@@ -666,98 +647,25 @@ public class NodeManager {
     }
   }
 
-  public Map<Integer, BaseNodeCache> getNodeCacheMap() {
-    return nodeCacheMap;
-  }
-
-  public void removeNodeCache(int nodeId) {
-    nodeCacheMap.remove(nodeId);
-  }
-
-  /**
-   * Safely get the specific Node's current status for showing cluster
-   *
-   * @param nodeId The specific Node's index
-   * @return The specific Node's current status if the nodeCache contains it, Unknown otherwise
-   */
-  private String getNodeStatusWithReason(int nodeId) {
-    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
-    return nodeCache == null
-        ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
-        : nodeCache.getNodeStatusWithReason();
-  }
-
   /**
-   * Filter the registered ConfigNodes through the specific NodeStatus
+   * Filter ConfigNodes through the specified NodeStatus
    *
-   * @param status The specific NodeStatus
-   * @return Filtered ConfigNodes with the specific NodeStatus
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
    */
   public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
-    return getRegisteredConfigNodes().stream()
-        .filter(
-            registeredConfigNode -> {
-              int configNodeId = registeredConfigNode.getConfigNodeId();
-              return nodeCacheMap.containsKey(configNodeId)
-                  && Arrays.stream(status)
-                      .anyMatch(s -> s.equals(nodeCacheMap.get(configNodeId).getNodeStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get NodeStatus by nodeId
-   *
-   * @param nodeId The specific NodeId
-   * @return NodeStatus of the specific node. If node does not exist, return null.
-   */
-  public NodeStatus getNodeStatusByNodeId(int nodeId) {
-    BaseNodeCache baseNodeCache = nodeCacheMap.get(nodeId);
-    return baseNodeCache == null ? null : baseNodeCache.getNodeStatus();
+    return nodeInfo.getRegisteredConfigNodes(
+        getLoadManager().filterConfigNodeThroughStatus(status));
   }
 
   /**
-   * Filter the registered DataNodes through the specific NodeStatus
+   * Filter DataNodes through the specified NodeStatus
    *
-   * @param status The specific NodeStatus
-   * @return Filtered DataNodes with the specific NodeStatus
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
    */
   public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
-    return getRegisteredDataNodes().stream()
-        .filter(
-            registeredDataNode -> {
-              int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
-              return nodeCacheMap.containsKey(dataNodeId)
-                  && Arrays.stream(status)
-                      .anyMatch(s -> s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get the loadScore of each DataNode
-   *
-   * @return Map<DataNodeId, loadScore>
-   */
-  public Map<Integer, Long> getAllLoadScores() {
-    Map<Integer, Long> result = new ConcurrentHashMap<>();
-
-    nodeCacheMap.forEach(
-        (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
-
-    return result;
-  }
-
-  /**
-   * Get the free disk space of the specified DataNode
-   *
-   * @param dataNodeId The index of the specified DataNode
-   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
-   */
-  public double getFreeDiskSpace(int dataNodeId) {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache =
-        (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
-    return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace();
+    return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
   }
 
   /**
@@ -767,15 +675,10 @@ public class NodeManager {
    */
   public Optional<TDataNodeLocation> getLowestLoadDataNode() {
     // TODO get real lowest load data node after scoring algorithm being implemented
-    List<TDataNodeConfiguration> targetDataNodeList =
-        filterDataNodeThroughStatus(NodeStatus.Running);
-
-    if (targetDataNodeList == null || targetDataNodeList.isEmpty()) {
-      return Optional.empty();
-    } else {
-      int index = random.nextInt(targetDataNodeList.size());
-      return Optional.of(targetDataNodeList.get(index).location);
-    }
+    int dataNodeId = getLoadManager().getLowestLoadDataNode();
+    return dataNodeId < 0
+        ? Optional.empty()
+        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
   }
 
   /**
@@ -784,22 +687,8 @@ public class NodeManager {
    * @return TDataNodeLocation with the lowest loadScore
    */
   public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
-    AtomicInteger result = new AtomicInteger();
-    AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
-
-    nodes.forEach(
-        nodeID -> {
-          BaseNodeCache cache = nodeCacheMap.get(nodeID);
-          long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore();
-          if (score < lowestLoadScore.get()) {
-            result.set(nodeID);
-            lowestLoadScore.set(score);
-          }
-        });
-
-    LOGGER.info(
-        "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
-    return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+    int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
+    return getRegisteredDataNode(dataNodeId).getLocation();
   }
 
   private ConsensusManager getConsensusManager() {
@@ -829,8 +718,4 @@ public class NodeManager {
   private UDFManager getUDFManager() {
     return configManager.getUDFManager();
   }
-
-  private ClusterQuotaManager getClusterQuotaManager() {
-    return configManager.getClusterQuotaManager();
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index fdec1487e6..f15978d6bd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -87,7 +86,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -100,7 +98,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /** The PartitionManager Manages cluster PartitionTable read and write requests. */
@@ -588,12 +585,34 @@ public class PartitionManager {
    * Only leader use this interface.
    *
    * @param database The specified Database
-   * @return All Regions' RegionReplicaSet of the specified StorageGroup
+   * @return All Regions' RegionReplicaSet of the specified Database
    */
   public List<TRegionReplicaSet> getAllReplicaSets(String database) {
     return partitionInfo.getAllReplicaSets(database);
   }
 
+  /**
+   * Get all RegionGroups currently owned by the specified Database
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    return partitionInfo.getAllReplicaSets(dataNodeId);
+  }
+
+  /**
+   * Only leader use this interface.
+   *
+   * @param database The specified Database
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return All Regions' RegionReplicaSet of the specified Database
+   */
+  public List<TRegionReplicaSet> getReplicaSets(
+      String database, List<TConsensusGroupId> regionGroupIds) {
+    return partitionInfo.getReplicaSets(database, regionGroupIds);
+  }
+
   /**
    * Only leader use this interface.
    *
@@ -666,22 +685,22 @@ public class PartitionManager {
   /**
    * Only leader use this interface.
    *
-   * @param storageGroup StorageGroupName
+   * @param database DatabaseName
    * @param type SchemaRegion or DataRegion
    * @return The specific StorageGroup's Regions that sorted by the number of allocated slots
    * @throws NoAvailableRegionGroupException When all RegionGroups within the specified StorageGroup
    *     are unavailable currently
    */
   public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
-      String storageGroup, TConsensusGroupType type) throws NoAvailableRegionGroupException {
+      String database, TConsensusGroupType type) throws NoAvailableRegionGroupException {
     // Collect static data
     List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter =
-        partitionInfo.getRegionGroupSlotsCounter(storageGroup, type);
+        partitionInfo.getRegionGroupSlotsCounter(database, type);
 
     // Filter RegionGroups that have Disabled status
     List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
     for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) {
-      RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight());
+      RegionGroupStatus status = getLoadManager().getRegionGroupStatus(slotsCounter.getRight());
       if (!RegionGroupStatus.Disabled.equals(status)) {
         result.add(slotsCounter);
       }
@@ -691,6 +710,9 @@ public class PartitionManager {
       throw new NoAvailableRegionGroupException(type);
     }
 
+    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap =
+        getLoadManager()
+            .getRegionGroupStatus(result.stream().map(Pair::getRight).collect(Collectors.toList()));
     result.sort(
         (o1, o2) -> {
           // Use the number of partitions as the first priority
@@ -700,8 +722,9 @@ public class PartitionManager {
             return 1;
           } else {
             // Use RegionGroup status as second priority, Running > Available > Discouraged
-            return getRegionGroupStatus(o1.getRight())
-                .compareTo(getRegionGroupStatus(o2.getRight()));
+            return regionGroupStatusMap
+                .get(o1.getRight())
+                .compare(regionGroupStatusMap.get(o2.getRight()));
           }
         });
     return result;
@@ -759,7 +782,8 @@ public class PartitionManager {
         .forEach(
             regionInfo -> {
               regionInfo.setStatus(
-                  getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
+                  getLoadManager()
+                      .getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
                       .getStatus());
 
               String regionType =
@@ -773,6 +797,15 @@ public class PartitionManager {
     return regionInfoListResp;
   }
 
+  /**
+   * Check if the specified RegionGroup exists.
+   *
+   * @param regionGroupId The specified RegionGroup
+   */
+  public boolean isRegionGroupExists(TConsensusGroupId regionGroupId) {
+    return partitionInfo.isRegionGroupExisted(regionGroupId);
+  }
+
   /**
    * update region location
    *
@@ -780,13 +813,6 @@ public class PartitionManager {
    * @return TSStatus
    */
   public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) {
-    // Remove heartbeat cache if exists
-    if (regionGroupCacheMap.containsKey(req.getRegionId())) {
-      regionGroupCacheMap
-          .get(req.getRegionId())
-          .removeCacheIfExists(req.getOldNode().getDataNodeId());
-    }
-
     return getConsensusManager().write(req).getStatus();
   }
 
@@ -1059,93 +1085,23 @@ public class PartitionManager {
         /* Stop the RegionCleaner service */
         currentRegionMaintainerFuture.cancel(false);
         currentRegionMaintainerFuture = null;
-        regionGroupCacheMap.clear();
         LOGGER.info("RegionCleaner is stopped successfully.");
       }
     }
   }
 
-  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
-    regionGroupCacheMap.remove(consensusGroupId);
-  }
-
   /**
-   * Filter the RegionGroups in the specified StorageGroup through the RegionGroupStatus
+   * Filter the RegionGroups in the specified Database through the RegionGroupStatus
    *
-   * @param storageGroup The specified StorageGroup
+   * @param database The specified Database
    * @param status The specified RegionGroupStatus
-   * @return Filtered RegionGroups with the specific RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
    */
   public List<TRegionReplicaSet> filterRegionGroupThroughStatus(
-      String storageGroup, RegionGroupStatus... status) {
-    return getAllReplicaSets(storageGroup).stream()
-        .filter(
-            regionReplicaSet -> {
-              TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
-              return regionGroupCacheMap.containsKey(regionGroupId)
-                  && Arrays.stream(status)
-                      .anyMatch(
-                          s ->
-                              s.equals(
-                                  regionGroupCacheMap
-                                      .get(regionGroupId)
-                                      .getStatistics()
-                                      .getRegionGroupStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Count the number of cluster Regions with specified RegionStatus
-   *
-   * @param type The specified RegionGroupType
-   * @param status The specified statues
-   * @return The number of cluster Regions with specified RegionStatus
-   */
-  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
-    AtomicInteger result = new AtomicInteger(0);
-    regionGroupCacheMap.forEach(
-        (regionGroupId, regionGroupCache) -> {
-          if (type.equals(regionGroupId.getType())) {
-            regionGroupCache
-                .getStatistics()
-                .getRegionStatisticsMap()
-                .values()
-                .forEach(
-                    regionStatistics -> {
-                      if (Arrays.stream(status)
-                          .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) {
-                        result.getAndIncrement();
-                      }
-                    });
-          }
-        });
-    return result.get();
-  }
-
-  /**
-   * Safely get RegionStatus.
-   *
-   * @param consensusGroupId Specified RegionGroupId
-   * @param dataNodeId Specified RegionReplicaId
-   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
-   */
-  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
-    return regionGroupCacheMap.containsKey(consensusGroupId)
-        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
-        : RegionStatus.Unknown;
-  }
-
-  /**
-   * Safely get RegionGroupStatus.
-   *
-   * @param consensusGroupId Specified RegionGroupId
-   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
-   */
-  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
-    return regionGroupCacheMap.containsKey(consensusGroupId)
-        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
-        : RegionGroupStatus.Disabled;
+      String database, RegionGroupStatus... status) {
+    List<TConsensusGroupId> matchedRegionGroups =
+        getLoadManager().filterRegionGroupThroughStatus(status);
+    return getReplicaSets(database, matchedRegionGroups);
   }
 
   public void getSchemaRegionIds(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
index 21909d6a5a..f9eff7b0de 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
@@ -72,10 +72,9 @@ public class PartitionMetrics implements IMetricSet {
       metricService.createAutoGauge(
           Metric.REGION_NUM.toString(),
           MetricLevel.CORE,
-          getPartitionManager(),
-          partitionManager ->
-              partitionManager.countRegionWithSpecifiedStatus(
-                  TConsensusGroupType.SchemaRegion, status),
+          getLoadManager(),
+          loadManager ->
+              loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.SchemaRegion, status),
           Tag.TYPE.toString(),
           TConsensusGroupType.SchemaRegion.toString(),
           Tag.STATUS.toString(),
@@ -85,10 +84,9 @@ public class PartitionMetrics implements IMetricSet {
       metricService.createAutoGauge(
           Metric.REGION_NUM.toString(),
           MetricLevel.CORE,
-          getPartitionManager(),
-          partitionManager ->
-              partitionManager.countRegionWithSpecifiedStatus(
-                  TConsensusGroupType.DataRegion, status),
+          getLoadManager(),
+          loadManager ->
+              loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.DataRegion, status),
           Tag.TYPE.toString(),
           TConsensusGroupType.DataRegion.toString(),
           Tag.STATUS.toString(),
@@ -330,8 +328,8 @@ public class PartitionMetrics implements IMetricSet {
     return configManager.getClusterSchemaManager();
   }
 
-  private PartitionManager getPartitionManager() {
-    return configManager.getPartitionManager();
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
index 9e58ae6d1c..3cf9976c3d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
@@ -21,20 +21,20 @@ package org.apache.iotdb.confignode.manager.partition;
 public enum RegionGroupStatus {
 
   /** All Regions in RegionGroup are in the Running status */
-  Running("Running"),
+  Running("Running", 1),
 
   /**
    * All Regions in RegionGroup are in the Running or Unknown status, and the number of Regions in
    * the Unknown status is less than half
    */
-  Available("Available"),
+  Available("Available", 2),
 
   /**
    * All Regions in RegionGroup are in the Running, Unknown or ReadOnly status, and at least 1 node
    * is in ReadOnly status, the number of Regions in the Unknown or ReadOnly status is less than
    * half
    */
-  Discouraged("Discouraged"),
+  Discouraged("Discouraged", 3),
 
   /**
    * The following cases will lead to Disabled RegionGroup:
@@ -43,18 +43,24 @@ public enum RegionGroupStatus {
    *
    * <p>2. More than half of the Regions are in Unknown or ReadOnly status
    */
-  Disabled("Disabled");
+  Disabled("Disabled", 4);
 
   private final String status;
+  private final int weight;
 
-  RegionGroupStatus(String status) {
+  RegionGroupStatus(String status, int weight) {
     this.status = status;
+    this.weight = weight;
   }
 
   public String getStatus() {
     return status;
   }
 
+  public int getWeight() {
+    return weight;
+  }
+
   public static RegionGroupStatus parse(String status) {
     for (RegionGroupStatus regionGroupStatus : RegionGroupStatus.values()) {
       if (regionGroupStatus.status.equals(status)) {
@@ -63,4 +69,13 @@ public enum RegionGroupStatus {
     }
     throw new RuntimeException(String.format("RegionGroupStatus %s doesn't exist.", status));
   }
+
+  /**
+   * Compare the weight of two RegionGroupStatus
+   *
+   * <p>Running > Available > Discouraged > Disabled
+   */
+  public int compare(RegionGroupStatus other) {
+    return Integer.compare(this.weight, other.weight);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 67c5526ff8..4c0b1761cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -221,18 +221,6 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
-  /** Return the number of registered ConfigNodes */
-  public int getRegisteredConfigNodeCount() {
-    int result;
-    configNodeInfoReadWriteLock.readLock().lock();
-    try {
-      result = registeredConfigNodes.size();
-    } finally {
-      configNodeInfoReadWriteLock.readLock().unlock();
-    }
-    return result;
-  }
-
   /** Return the number of total cpu cores in online DataNodes */
   public int getTotalCpuCoreCount() {
     int result = 0;
@@ -269,6 +257,23 @@ public class NodeInfo implements SnapshotProcessor {
     }
   }
 
+  /** @return The specified registered DataNodes */
+  public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> dataNodeIds) {
+    List<TDataNodeConfiguration> result = new ArrayList<>();
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try {
+      dataNodeIds.forEach(
+          dataNodeId -> {
+            if (registeredDataNodes.containsKey(dataNodeId)) {
+              result.add(registeredDataNodes.get(dataNodeId).deepCopy());
+            }
+          });
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
   /**
    * Update ConfigNodeList both in memory and confignode-system.properties file
    *
@@ -336,6 +341,7 @@ public class NodeInfo implements SnapshotProcessor {
     return status;
   }
 
+  /** @return All registered ConfigNodes */
   public List<TConfigNodeLocation> getRegisteredConfigNodes() {
     List<TConfigNodeLocation> result;
     configNodeInfoReadWriteLock.readLock().lock();
@@ -347,6 +353,23 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
+  /** @return The specified registered ConfigNode */
+  public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> configNodeIds) {
+    List<TConfigNodeLocation> result = new ArrayList<>();
+    configNodeInfoReadWriteLock.readLock().lock();
+    try {
+      configNodeIds.forEach(
+          configNodeId -> {
+            if (registeredConfigNodes.containsKey(configNodeId)) {
+              result.add(registeredConfigNodes.get(configNodeId).deepCopy());
+            }
+          });
+    } finally {
+      configNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
   public int generateNextNodeId() {
     return nextNodeId.incrementAndGet();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index b27a4b529f..4db9a39eb0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -119,8 +119,9 @@ public class DatabasePartitionTable {
 
     return result;
   }
+
   /**
-   * Get all RegionGroups currently owned by this StorageGroup
+   * Get all RegionGroups currently owned by this Database
    *
    * @param type The specified TConsensusGroupType
    * @return Deep copy of all Regions' RegionReplicaSet with the specified TConsensusGroupType
@@ -137,6 +138,37 @@ public class DatabasePartitionTable {
     return result;
   }
 
+  /**
+   * Get all RegionGroups currently owned by the specified Database
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    return regionGroupMap.values().stream()
+        .filter(regionGroup -> regionGroup.belongsToDataNode(dataNodeId))
+        .map(RegionGroup::getReplicaSet)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the RegionGroups with the specified RegionGroupIds
+   *
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return Deep copy of the RegionGroups with the specified RegionGroupIds
+   */
+  public List<TRegionReplicaSet> getReplicaSets(List<TConsensusGroupId> regionGroupIds) {
+    List<TRegionReplicaSet> result = new ArrayList<>();
+
+    for (TConsensusGroupId regionGroupId : regionGroupIds) {
+      if (regionGroupMap.containsKey(regionGroupId)) {
+        result.add(regionGroupMap.get(regionGroupId).getReplicaSet());
+      }
+    }
+
+    return result;
+  }
+
   /**
    * Only leader use this interface.
    *
@@ -496,7 +528,7 @@ public class DatabasePartitionTable {
    * @param regionId TConsensusGroupId
    * @return True if contains.
    */
-  public boolean containRegion(TConsensusGroupId regionId) {
+  public boolean containRegionGroup(TConsensusGroupId regionId) {
     return regionGroupMap.containsKey(regionId);
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index b19ac834b3..062500012e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -488,6 +488,17 @@ public class PartitionInfo implements SnapshotProcessor {
     return regionResp;
   }
 
+  /**
+   * Check if the specified RegionGroup exists.
+   *
+   * @param regionGroupId The specified RegionGroup
+   */
+  public boolean isRegionGroupExisted(TConsensusGroupId regionGroupId) {
+    return databasePartitionTables.values().stream()
+        .anyMatch(
+            databasePartitionTable -> databasePartitionTable.containRegionGroup(regionGroupId));
+  }
+
   /**
    * Update the location info of given regionId
    *
@@ -500,9 +511,10 @@ public class PartitionInfo implements SnapshotProcessor {
     TDataNodeLocation oldNode = req.getOldNode();
     TDataNodeLocation newNode = req.getNewNode();
     databasePartitionTables.values().stream()
-        .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId))
+        .filter(databasePartitionTable -> databasePartitionTable.containRegionGroup(regionId))
         .forEach(
-            sgPartitionTable -> sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode));
+            databasePartitionTable ->
+                databasePartitionTable.updateRegionLocation(regionId, oldNode, newNode));
 
     return status;
   }
@@ -516,7 +528,7 @@ public class PartitionInfo implements SnapshotProcessor {
   public String getRegionStorageGroup(TConsensusGroupId regionId) {
     Optional<DatabasePartitionTable> sgPartitionTableOptional =
         databasePartitionTables.values().stream()
-            .filter(s -> s.containRegion(regionId))
+            .filter(s -> s.containRegionGroup(regionId))
             .findFirst();
     return sgPartitionTableOptional.map(DatabasePartitionTable::getDatabaseName).orElse(null);
   }
@@ -619,6 +631,38 @@ public class PartitionInfo implements SnapshotProcessor {
     }
   }
 
+  /**
+   * Get all RegionGroups currently owned by the specified Database
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    List<TRegionReplicaSet> result = new ArrayList<>();
+    databasePartitionTables
+        .values()
+        .forEach(
+            databasePartitionTable ->
+                result.addAll(databasePartitionTable.getAllReplicaSets(dataNodeId)));
+    return result;
+  }
+
+  /**
+   * Only leader use this interface.
+   *
+   * @param database The specified Database
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return All Regions' RegionReplicaSet of the specified Database
+   */
+  public List<TRegionReplicaSet> getReplicaSets(
+      String database, List<TConsensusGroupId> regionGroupIds) {
+    if (databasePartitionTables.containsKey(database)) {
+      return databasePartitionTables.get(database).getReplicaSets(regionGroupIds);
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
   /**
    * Only leader use this interface.
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 4c4c7ffbc9..a555c2349c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -71,7 +71,7 @@ public class RegionGroup {
   }
 
   public TRegionReplicaSet getReplicaSet() {
-    return replicaSet;
+    return replicaSet.deepCopy();
   }
 
   /** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count> */
@@ -93,6 +93,17 @@ public class RegionGroup {
     return totalTimeSlotCount.get();
   }
 
+  /**
+   * Check if the RegionGroup belongs to the specified DataNode.
+   *
+   * @param dataNodeId The specified DataNodeId.
+   * @return True if the RegionGroup belongs to the specified DataNode.
+   */
+  public boolean belongsToDataNode(int dataNodeId) {
+    return replicaSet.getDataNodeLocations().stream()
+        .anyMatch(dataNodeLocation -> dataNodeLocation.getDataNodeId() == dataNodeId);
+  }
+
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(createTime, outputStream);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 06aef7c2f6..8b925535bf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -49,7 +50,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
@@ -145,8 +145,7 @@ public class ConfigNodeProcedureEnv {
    * @throws TException Thrift IOE
    */
   public boolean invalidateCache(String storageGroupName) throws IOException, TException {
-    NodeManager nodeManager = configManager.getNodeManager();
-    List<TDataNodeConfiguration> allDataNodes = nodeManager.getRegisteredDataNodes();
+    List<TDataNodeConfiguration> allDataNodes = getNodeManager().getRegisteredDataNodes();
     TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
     invalidateCacheReq.setStorageGroup(true);
     invalidateCacheReq.setFullPath(storageGroupName);
@@ -154,14 +153,14 @@ public class ConfigNodeProcedureEnv {
       int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
 
       // if the node is not alive, sleep 1 second and try again
-      NodeStatus nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+      NodeStatus nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
       if (nodeStatus == NodeStatus.Unknown) {
         try {
           TimeUnit.MILLISECONDS.sleep(1000);
         } catch (InterruptedException e) {
           LOG.error("Sleep failed in ConfigNodeProcedureEnv: ", e);
         }
-        nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+        nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
       }
 
       if (nodeStatus == NodeStatus.Running) {
@@ -202,14 +201,11 @@ public class ConfigNodeProcedureEnv {
   }
 
   public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) {
-    return configManager
-                .getNodeManager()
+    return getNodeManager()
                 .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly)
                 .size()
             - Boolean.compare(
-                configManager
-                        .getNodeManager()
-                        .getNodeStatusByNodeId(removedDatanode.getDataNodeId())
+                getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
                     != NodeStatus.Unknown,
                 false)
         >= NodeInfo.getMinimumDataNode();
@@ -305,8 +301,6 @@ public class ConfigNodeProcedureEnv {
    * @throws ProcedureException if failed status
    */
   public void stopConfigNode(TConfigNodeLocation tConfigNodeLocation) throws ProcedureException {
-    getNodeManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
-
     TSStatus tsStatus =
         (TSStatus)
             SyncConfigNodeClientPool.getInstance()
@@ -318,6 +312,8 @@ public class ConfigNodeProcedureEnv {
     if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new ProcedureException(tsStatus.getMessage());
     }
+
+    getLoadManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
   }
 
   /**
@@ -371,8 +367,7 @@ public class ConfigNodeProcedureEnv {
    */
   public void markDataNodeAsRemovingAndBroadcast(TDataNodeLocation dataNodeLocation) {
     // Send request to update NodeStatus on the DataNode to be removed
-    if (configManager.getNodeManager().getNodeStatusByNodeId(dataNodeLocation.getDataNodeId())
-        == NodeStatus.Unknown) {
+    if (getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()) == NodeStatus.Unknown) {
       SyncDataNodeClientPool.getInstance()
           .sendSyncRequestToDataNodeWithGivenRetry(
               dataNodeLocation.getInternalEndPoint(),
@@ -388,10 +383,11 @@ public class ConfigNodeProcedureEnv {
     }
 
     // Force updating NodeStatus to Removing
-    getNodeManager()
-        .getNodeCacheMap()
-        .get(dataNodeLocation.getDataNodeId())
-        .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
+    getLoadManager()
+        .forceUpdateNodeCache(
+            NodeType.DataNode,
+            dataNodeLocation.getDataNodeId(),
+            NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
   }
 
   /**
@@ -539,10 +535,7 @@ public class ConfigNodeProcedureEnv {
         (dataNodeId, regionStatus) ->
             heartbeatSampleMap.put(
                 dataNodeId, new RegionHeartbeatSample(currentTime, currentTime, regionStatus)));
-    getPartitionManager()
-        .getRegionGroupCacheMap()
-        .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
-        .forceUpdate(heartbeatSampleMap);
+    getLoadManager().forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
 
     // Select leader greedily for iot consensus protocol
     if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index bf1ce7e940..c75fd419b8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -328,7 +327,7 @@ public class DataNodeRemoveHandler {
     TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
 
     status =
-        configManager.getNodeManager().getNodeStatusByNodeId(originalDataNode.getDataNodeId())
+        configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
                 == NodeStatus.Unknown
             ? SyncDataNodeClientPool.getInstance()
                 .sendSyncRequestToDataNodeWithGivenRetry(
@@ -375,6 +374,9 @@ public class DataNodeRemoveHandler {
         getIdWithRpcEndpoint(originalDataNode),
         getIdWithRpcEndpoint(destDataNode));
 
+    // Remove the RegionGroupCache of the regionId
+    configManager.getLoadManager().removeRegionGroupCache(regionId);
+
     // Broadcast the latest RegionRouteMap when Region migration finished
     configManager.getLoadManager().broadcastLatestRegionRouteMap();
   }
@@ -427,7 +429,7 @@ public class DataNodeRemoveHandler {
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithGivenRetry(
                 dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
-    configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
+    configManager.getLoadManager().removeNodeCache(dataNode.getDataNodeId());
     LOGGER.info(
         "{}, Stop Data Node result: {}, stoppedDataNode: {}",
         REMOVE_DATANODE_PROCESS,
@@ -509,9 +511,8 @@ public class DataNodeRemoveHandler {
     if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) {
       for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
         // check whether removed data node is in running state
-        BaseNodeCache nodeCache =
-            configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId());
-        if (!NodeStatus.Running.equals(nodeCache.getNodeStatus())) {
+        if (!NodeStatus.Running.equals(
+            configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()))) {
           removedDataNodes.remove(dataNodeLocation);
           LOGGER.error(
               "Failed to remove data node {} because it is not in running and the configuration of cluster is one replication",
@@ -530,7 +531,7 @@ public class DataNodeRemoveHandler {
             removeDataNodePlan.getDataNodeLocations().stream()
                 .filter(
                     x ->
-                        configManager.getNodeManager().getNodeStatusByNodeId(x.getDataNodeId())
+                        configManager.getLoadManager().getNodeStatus(x.getDataNodeId())
                             != NodeStatus.Unknown)
                 .count();
     if (availableDatanodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index f1271c25b1..43c3404b58 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -120,7 +120,7 @@ public class DeleteDatabaseProcedure
               regionReplicaSet -> {
                 // Clear heartbeat cache along the way
                 env.getConfigManager()
-                    .getPartitionManager()
+                    .getLoadManager()
                     .removeRegionGroupCache(regionReplicaSet.getRegionId());
                 env.getConfigManager()
                     .getLoadManager()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 46b58399ca..e1f02cffb2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -47,6 +47,9 @@ import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 /** Region migrate procedure */
 public class RegionMigrateProcedure
     extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
+
+  // TODO: Reach an agreement on RegionMigrateProcedure
+
   private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index 6da0304f44..372832d106 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -64,7 +64,7 @@ public class GreedyPriorityTest {
           NodeStatus.Running, NodeStatus.Unknown, NodeStatus.Running, NodeStatus.ReadOnly
         };
     for (int i = 0; i < 4; i++) {
-      nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+      nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
       nodeCacheMap
           .get(i)
           .cacheHeartbeatSample(
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index f1bf444427..b6248d7872 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -61,7 +61,7 @@ public class LeaderPriorityBalancerTest {
     long currentTimeMillis = System.currentTimeMillis();
     Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
     for (int i = 0; i < 6; i++) {
-      nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+      nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
       if (i != 2 && i != 5) {
         nodeCacheMap
             .get(i)
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
index b51cf10bcc..56f3e64d26 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
@@ -30,7 +30,7 @@ public class NodeCacheTest {
 
   @Test
   public void forceUpdateTest() {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
+    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1);
 
     // Test default
     Assert.assertEquals(NodeStatus.Unknown, dataNodeHeartbeatCache.getNodeStatus());
@@ -55,7 +55,7 @@ public class NodeCacheTest {
 
   @Test
   public void periodicUpdateTest() {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
+    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1);
     long currentTime = System.currentTimeMillis();
     dataNodeHeartbeatCache.cacheHeartbeatSample(
         new NodeHeartbeatSample(