You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/06 14:22:39 UTC

[iotdb] branch master updated: [IOTDB-3711] Update RegionRouteMap based on heartbeat sample (#6590)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 05ffd8893b [IOTDB-3711] Update RegionRouteMap based on heartbeat sample (#6590)
05ffd8893b is described below

commit 05ffd8893bc4683e7bd5ed928f3e6d362ac3d697
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Wed Jul 6 22:22:32 2022 +0800

    [IOTDB-3711] Update RegionRouteMap based on heartbeat sample (#6590)
---
 .../confignode/client/AsyncDataNodeClientPool.java | 19 +++++
 .../handlers/ConfigNodeHeartbeatHandler.java       |  6 +-
 .../client/handlers/DataNodeHeartbeatHandler.java  | 24 ++++--
 .../handlers/UpdateRegionRouteMapHandler.java      | 58 +++++++++++++
 .../iotdb/confignode/manager/ConfigManager.java    |  2 +-
 .../iotdb/confignode/manager/NodeManager.java      |  6 +-
 .../iotdb/confignode/manager/load/LoadManager.java | 98 ++++++++++++++++++----
 .../load/heartbeat/ConfigNodeHeartbeatCache.java   | 40 ++++++---
 .../load/heartbeat/DataNodeHeartbeatCache.java     | 32 ++++---
 .../{IHeartbeatStatistic.java => INodeCache.java}  | 16 ++--
 .../manager/load/heartbeat/IRegionGroupCache.java  | 16 ++--
 ...rtbeatPackage.java => NodeHeartbeatSample.java} |  6 +-
 .../manager/load/heartbeat/RegionGroupCache.java   | 59 +++++++++++--
 ...beatPackage.java => RegionHeartbeatSample.java} | 21 ++++-
 14 files changed, 328 insertions(+), 75 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index c8c0e052f8..4f620f35ef 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -31,12 +31,14 @@ import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.client.handlers.FlushHandler;
 import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
 import org.apache.iotdb.confignode.client.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -309,6 +311,23 @@ public class AsyncDataNodeClientPool {
     }
   }
 
+  /**
+   * Update the RegionRouteMap cache on specific DataNode
+   *
+   * @param endPoint The specificDataNode
+   */
+  public void updateRegionRouteMap(
+      TEndPoint endPoint, TRegionRouteReq regionRouteReq, UpdateRegionRouteMapHandler handler) {
+    // TODO: Add a retry logic
+    try {
+      clientManager.borrowClient(endPoint).updateRegionCache(regionRouteReq, handler);
+    } catch (IOException e) {
+      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
+    } catch (TException e) {
+      LOGGER.error("Update RegionRouteMap on DataNode {} failed", endPoint, e);
+    }
+  }
+
   // TODO: Is the ClientPool must be a singleton?
   private static class ClientPoolHolder {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
index 4a3b6b94f7..d177a13cad 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.client.handlers;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
+import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -42,8 +42,8 @@ public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
 
   @Override
   public void onComplete(Long timestamp) {
-    configNodeHeartbeatCache.cacheHeartBeat(
-        new HeartbeatPackage(timestamp, System.currentTimeMillis()));
+    configNodeHeartbeatCache.cacheHeartbeatSample(
+        new NodeHeartbeatSample(timestamp, System.currentTimeMillis()));
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
index 4dcb53cb3b..d6c02d3712 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.confignode.client.handlers;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
 import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -52,21 +53,26 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
 
   @Override
   public void onComplete(THeartbeatResp heartbeatResp) {
-    dataNodeHeartbeatCache.cacheHeartBeat(
-        new HeartbeatPackage(heartbeatResp.getHeartbeatTimestamp(), System.currentTimeMillis()));
+    long receiveTime = System.currentTimeMillis();
 
+    // Update NodeCache
+    dataNodeHeartbeatCache.cacheHeartbeatSample(
+        new NodeHeartbeatSample(heartbeatResp.getHeartbeatTimestamp(), receiveTime));
+
+    // Update RegionCache
     if (heartbeatResp.isSetJudgedLeaders()) {
       heartbeatResp
           .getJudgedLeaders()
           .forEach(
-              (consensusGroupId, isLeader) -> {
-                if (isLeader) {
+              (consensusGroupId, isLeader) ->
                   regionGroupCacheMap
                       .computeIfAbsent(consensusGroupId, empty -> new RegionGroupCache())
-                      .updateLeader(
-                          heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId());
-                }
-              });
+                      .cacheHeartbeatSample(
+                          new RegionHeartbeatSample(
+                              heartbeatResp.getHeartbeatTimestamp(),
+                              receiveTime,
+                              dataNodeLocation.getDataNodeId(),
+                              isLeader)));
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
new file mode 100644
index 0000000000..aa97397943
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+
+public class UpdateRegionRouteMapHandler implements AsyncMethodCallback<TSStatus> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
+
+  private final TDataNodeLocation dataNodeLocation;
+  private final CountDownLatch latch;
+
+  public UpdateRegionRouteMapHandler(TDataNodeLocation dataNodeLocation, CountDownLatch latch) {
+    this.dataNodeLocation = dataNodeLocation;
+    this.latch = latch;
+  }
+
+  @Override
+  public void onComplete(TSStatus status) {
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", dataNodeLocation);
+    } else {
+      LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
+    }
+    latch.countDown();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
+    latch.countDown();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6b8588d924..4758921fc4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -220,7 +220,7 @@ public class ConfigManager implements IManager {
           Comparator.comparingInt(dataNodeInfoLocation -> dataNodeInfoLocation.getDataNodeId()));
       Map<Integer, String> nodeStatus = new HashMap<>();
       getLoadManager()
-          .getHeartbeatCacheMap()
+          .getNodeCacheMap()
           .forEach(
               (nodeId, heartbeatCache) ->
                   nodeStatus.put(nodeId, heartbeatCache.getNodeStatus().getStatus()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 01cea856f1..bac86cbe09 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -187,11 +187,7 @@ public class NodeManager {
             int dataNodeId = dataNodeInfo.getLocation().getDataNodeId();
             tDataNodesLocation.setDataNodeId(dataNodeId);
             tDataNodesLocation.setStatus(
-                getLoadManager()
-                    .getHeartbeatCacheMap()
-                    .get(dataNodeId)
-                    .getNodeStatus()
-                    .getStatus());
+                getLoadManager().getNodeCacheMap().get(dataNodeId).getNodeStatus().getStatus());
             tDataNodesLocation.setRpcAddresss(
                 dataNodeInfo.getLocation().getClientRpcEndPoint().getIp());
             tDataNodesLocation.setRpcPort(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b5ea29f5b0..348744ff8c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -34,6 +35,8 @@ import org.apache.iotdb.confignode.client.AsyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
@@ -48,9 +51,10 @@ import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IHeartbeatStatistic;
+import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,9 +63,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -73,14 +79,16 @@ public class LoadManager {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
 
-  private final IManager configManager;
+  private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
+  private static final long heartbeatInterval = conf.getHeartbeatInterval();
+  public static final TEndPoint currentNode =
+      new TEndPoint(conf.getInternalAddress(), conf.getInternalPort());
 
-  private final long heartbeatInterval =
-      ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
+  private final IManager configManager;
 
   /** Heartbeat sample cache */
   // Map<NodeId, IHeartbeatStatistic>
-  private final Map<Integer, IHeartbeatStatistic> heartbeatCacheMap;
+  private final Map<Integer, INodeCache> nodeCacheMap;
   // Map<RegionId, RegionGroupCache>
   private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
 
@@ -108,7 +116,7 @@ public class LoadManager {
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
-    this.heartbeatCacheMap = new ConcurrentHashMap<>();
+    this.nodeCacheMap = new ConcurrentHashMap<>();
     this.regionGroupCacheMap = new ConcurrentHashMap<>();
 
     this.regionBalancer = new RegionBalancer(configManager);
@@ -183,7 +191,7 @@ public class LoadManager {
   public Map<Integer, Float> getAllLoadScores() {
     Map<Integer, Float> result = new ConcurrentHashMap<>();
 
-    heartbeatCacheMap.forEach(
+    nodeCacheMap.forEach(
         (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
 
     return result;
@@ -256,7 +264,57 @@ public class LoadManager {
   }
 
   private void updateNodeLoadStatistic() {
-    heartbeatCacheMap.values().forEach(IHeartbeatStatistic::updateLoadStatistic);
+    AtomicBoolean isNeedBroadcast = new AtomicBoolean(false);
+
+    nodeCacheMap
+        .values()
+        .forEach(
+            nodeCache -> {
+              boolean updateResult = nodeCache.updateLoadStatistic();
+              if (conf.getRoutingPolicy().equals(RouteBalancer.greedyPolicy)
+                  && nodeCache instanceof DataNodeHeartbeatCache) {
+                // We need a broadcast when some DataNode fail down
+                isNeedBroadcast.compareAndSet(false, updateResult);
+              }
+            });
+
+    regionGroupCacheMap
+        .values()
+        .forEach(
+            regionGroupCache -> {
+              boolean updateResult = regionGroupCache.updateLoadStatistic();
+              if (conf.getRoutingPolicy().equals(RouteBalancer.leaderPolicy)) {
+                // We need a broadcast when the leadership changed
+                isNeedBroadcast.compareAndSet(false, updateResult);
+              }
+            });
+
+    if (isNeedBroadcast.get()) {
+      broadcastLatestRegionRouteMap();
+    }
+  }
+
+  private void broadcastLatestRegionRouteMap() {
+    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genRealTimeRoutingPolicy();
+    List<TDataNodeInfo> onlineDataNodes = getOnlineDataNodes(-1);
+    CountDownLatch latch = new CountDownLatch(onlineDataNodes.size());
+
+    LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
+
+    onlineDataNodes.forEach(
+        dataNodeInfo ->
+            AsyncDataNodeClientPool.getInstance()
+                .updateRegionRouteMap(
+                    dataNodeInfo.getLocation().getInternalEndPoint(),
+                    new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
+                    new UpdateRegionRouteMapHandler(dataNodeInfo.getLocation(), latch)));
+
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      LOGGER.warn("Broadcast the latest RegionRouteMap was interrupted!");
+    }
+    LOGGER.info("Broadcast the latest RegionRouteMap finished.");
   }
 
   private THeartbeatReq genHeartbeatReq() {
@@ -285,7 +343,7 @@ public class LoadManager {
           new DataNodeHeartbeatHandler(
               dataNodeInfo.getLocation(),
               (DataNodeHeartbeatCache)
-                  heartbeatCacheMap.computeIfAbsent(
+                  nodeCacheMap.computeIfAbsent(
                       dataNodeInfo.getLocation().getDataNodeId(),
                       empty -> new DataNodeHeartbeatCache()),
               regionGroupCacheMap);
@@ -301,15 +359,23 @@ public class LoadManager {
    * @param registeredConfigNodes ConfigNodes that registered in cluster
    */
   private void pingRegisteredConfigNodes(List<TConfigNodeLocation> registeredConfigNodes) {
+
     // Send heartbeat requests
     for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+      if (configNodeLocation.getInternalEndPoint().equals(currentNode)) {
+        // Skip itself
+        nodeCacheMap.putIfAbsent(
+            configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
+        continue;
+      }
+
       ConfigNodeHeartbeatHandler handler =
           new ConfigNodeHeartbeatHandler(
               configNodeLocation,
               (ConfigNodeHeartbeatCache)
-                  heartbeatCacheMap.computeIfAbsent(
+                  nodeCacheMap.computeIfAbsent(
                       configNodeLocation.getConfigNodeId(),
-                      empty -> new ConfigNodeHeartbeatCache()));
+                      empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
       AsyncConfigNodeClientPool.getInstance()
           .getConfigNodeHeartBeat(
               configNodeLocation.getInternalEndPoint(),
@@ -324,14 +390,14 @@ public class LoadManager {
    * @param nodeId removed node id
    */
   public void removeNodeHeartbeatHandCache(Integer nodeId) {
-    heartbeatCacheMap.remove(nodeId);
+    nodeCacheMap.remove(nodeId);
   }
 
   public List<TConfigNodeLocation> getOnlineConfigNodes() {
     return getNodeManager().getRegisteredConfigNodes().stream()
         .filter(
             registeredConfigNode ->
-                heartbeatCacheMap
+                nodeCacheMap
                     .get(registeredConfigNode.getConfigNodeId())
                     .getNodeStatus()
                     .equals(NodeStatus.Running))
@@ -342,7 +408,7 @@ public class LoadManager {
     return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
         .filter(
             registeredDataNode ->
-                heartbeatCacheMap
+                nodeCacheMap
                     .get(registeredDataNode.getLocation().getDataNodeId())
                     .getNodeStatus()
                     .equals(NodeStatus.Running))
@@ -365,7 +431,7 @@ public class LoadManager {
     return configManager.getPartitionManager();
   }
 
-  public Map<Integer, IHeartbeatStatistic> getHeartbeatCacheMap() {
-    return heartbeatCacheMap;
+  public Map<Integer, INodeCache> getNodeCacheMap() {
+    return nodeCacheMap;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index 039dda5583..597a9f944b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -18,43 +18,52 @@
  */
 package org.apache.iotdb.confignode.manager.load.heartbeat;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 
 import java.util.LinkedList;
 
-public class ConfigNodeHeartbeatCache implements IHeartbeatStatistic {
+public class ConfigNodeHeartbeatCache implements INodeCache {
 
   // Cache heartbeat samples
   private static final int maximumWindowSize = 100;
-  private final LinkedList<HeartbeatPackage> slidingWindow;
+  private final LinkedList<NodeHeartbeatSample> slidingWindow;
+
+  private final TConfigNodeLocation configNodeLocation;
 
   // For showing cluster
   private volatile NodeStatus status;
 
-  public ConfigNodeHeartbeatCache() {
+  public ConfigNodeHeartbeatCache(TConfigNodeLocation configNodeLocation) {
+    this.configNodeLocation = configNodeLocation;
     this.slidingWindow = new LinkedList<>();
-
     this.status = NodeStatus.Running;
   }
 
   @Override
-  public void cacheHeartBeat(HeartbeatPackage newHeartbeat) {
+  public void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample) {
     synchronized (slidingWindow) {
       // Only sequential heartbeats are accepted.
       // And un-sequential heartbeats will be discarded.
       if (slidingWindow.size() == 0
-          || slidingWindow.getLast().getSendTimestamp() < newHeartbeat.getSendTimestamp()) {
-        slidingWindow.add(newHeartbeat);
+          || slidingWindow.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+        slidingWindow.add(newHeartbeatSample);
       }
 
-      while (slidingWindow.size() > maximumWindowSize) {
+      if (slidingWindow.size() > maximumWindowSize) {
         slidingWindow.removeFirst();
       }
     }
   }
 
   @Override
-  public void updateLoadStatistic() {
+  public boolean updateLoadStatistic() {
+    if (configNodeLocation.getInternalEndPoint().equals(LoadManager.currentNode)) {
+      // We don't need to update itself
+      return false;
+    }
+
     long lastSendTime = 0;
     synchronized (slidingWindow) {
       if (slidingWindow.size() > 0) {
@@ -62,12 +71,23 @@ public class ConfigNodeHeartbeatCache implements IHeartbeatStatistic {
       }
     }
 
-    // TODO: Optimize
+    NodeStatus originStatus;
+    switch (status) {
+      case Running:
+        originStatus = NodeStatus.Running;
+        break;
+      case Unknown:
+      default:
+        originStatus = NodeStatus.Unknown;
+    }
+
+    // TODO: Optimize judge logic
     if (System.currentTimeMillis() - lastSendTime > 20_000) {
       status = NodeStatus.Unknown;
     } else {
       status = NodeStatus.Running;
     }
+    return !status.getStatus().equals(originStatus.getStatus());
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index e02023fd24..a836a6d2e2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -23,13 +23,13 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 import java.util.LinkedList;
 
 /** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
-public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
+public class DataNodeHeartbeatCache implements INodeCache {
 
   // TODO: This class might be split into DataNodeCache and ConfigNodeCache
 
   // Cache heartbeat samples
   private static final int maximumWindowSize = 100;
-  private final LinkedList<HeartbeatPackage> slidingWindow;
+  private final LinkedList<NodeHeartbeatSample> slidingWindow;
 
   // For guiding queries, the higher the score the higher the load
   private volatile float loadScore;
@@ -44,23 +44,23 @@ public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
   }
 
   @Override
-  public void cacheHeartBeat(HeartbeatPackage newHeartbeat) {
+  public void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample) {
     synchronized (slidingWindow) {
-      // Only sequential heartbeats are accepted.
-      // And un-sequential heartbeats will be discarded.
+      // Only sequential HeartbeatSamples are accepted.
+      // And un-sequential HeartbeatSamples will be discarded.
       if (slidingWindow.size() == 0
-          || slidingWindow.getLast().getSendTimestamp() < newHeartbeat.getSendTimestamp()) {
-        slidingWindow.add(newHeartbeat);
+          || slidingWindow.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+        slidingWindow.add(newHeartbeatSample);
       }
 
-      while (slidingWindow.size() > maximumWindowSize) {
+      if (slidingWindow.size() > maximumWindowSize) {
         slidingWindow.removeFirst();
       }
     }
   }
 
   @Override
-  public void updateLoadStatistic() {
+  public boolean updateLoadStatistic() {
     long lastSendTime = 0;
     synchronized (slidingWindow) {
       if (slidingWindow.size() > 0) {
@@ -68,13 +68,23 @@ public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
       }
     }
 
-    // TODO: Optimize
-    loadScore = -lastSendTime;
+    NodeStatus originStatus;
+    switch (status) {
+      case Running:
+        originStatus = NodeStatus.Running;
+        break;
+      case Unknown:
+      default:
+        originStatus = NodeStatus.Unknown;
+    }
+
+    // TODO: Optimize judge logic
     if (System.currentTimeMillis() - lastSendTime > 20_000) {
       status = NodeStatus.Unknown;
     } else {
       status = NodeStatus.Running;
     }
+    return !status.getStatus().equals(originStatus.getStatus());
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
similarity index 77%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
index 413f54792a..68de3916e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
@@ -21,17 +21,21 @@ package org.apache.iotdb.confignode.manager.load.heartbeat;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 
 /** All the statistic interfaces that provided by HeartbeatCache */
-public interface IHeartbeatStatistic {
+public interface INodeCache {
 
   /**
-   * Cache the newest HeartbeatPackage
+   * Cache the newest HeartbeatSample
    *
-   * @param newHeartbeat The newest HeartbeatData
+   * @param newHeartbeatSample The newest HeartbeatSample
    */
-  void cacheHeartBeat(HeartbeatPackage newHeartbeat);
+  void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample);
 
-  /** Invoking periodically to update node load statistics */
-  void updateLoadStatistic();
+  /**
+   * Invoking periodically to update Nodes' load statistics
+   *
+   * @return true if some load statistic changed
+   */
+  boolean updateLoadStatistic();
 
   /** @return The latest load score of a node, the higher the score the higher the load */
   float getLoadScore();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
index 7283665dc5..3c13b7e855 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
@@ -21,17 +21,23 @@ package org.apache.iotdb.confignode.manager.load.heartbeat;
 public interface IRegionGroupCache {
 
   /**
-   * Update RegionGroup's latest leader
+   * Cache the newest HeartbeatSample
    *
-   * @param timestamp Judging timestamp
-   * @param dataNodeId Leader location
+   * @param newHeartbeatSample The newest HeartbeatSample
    */
-  void updateLeader(long timestamp, int dataNodeId);
+  void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample);
+
+  /**
+   * Invoking periodically to update RegionGroups' load statistics
+   *
+   * @return true if some load statistic changed
+   */
+  boolean updateLoadStatistic();
 
   /**
    * Get RegionGroup's latest leader
    *
-   * @return The DataNodeId of latest leader
+   * @return The DataNodeId of the latest leader
    */
   int getLeaderDataNodeId();
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/NodeHeartbeatSample.java
similarity index 89%
copy from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/NodeHeartbeatSample.java
index 66327611aa..509642c2fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/NodeHeartbeatSample.java
@@ -18,13 +18,15 @@
  */
 package org.apache.iotdb.confignode.manager.load.heartbeat;
 
-public class HeartbeatPackage {
+public class NodeHeartbeatSample {
 
   // Unit: ms
   private final long sendTimestamp;
   private final long receiveTimestamp;
 
-  public HeartbeatPackage(long sendTimestamp, long receiveTimestamp) {
+  // TODO: Add load sample
+
+  public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
     this.sendTimestamp = sendTimestamp;
     this.receiveTimestamp = receiveTimestamp;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index 6ec938ccbc..ce799864a6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -18,26 +18,75 @@
  */
 package org.apache.iotdb.confignode.manager.load.heartbeat;
 
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class RegionGroupCache implements IRegionGroupCache {
 
   // TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache
 
-  private long timestamp;
+  private static final int maximumWindowSize = 100;
+  // Map<DataNodeId(where a RegionReplica resides), LinkedList<RegionHeartbeatSample>>
+  private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
 
+  // Indicates the version of the statistics
+  private final AtomicLong versionTimestamp;
+  // The DataNode where the leader resides
   private final AtomicInteger leaderDataNodeId;
 
   public RegionGroupCache() {
+    this.slidingWindow = new ConcurrentHashMap<>();
+
+    this.versionTimestamp = new AtomicLong(0);
     this.leaderDataNodeId = new AtomicInteger(-1);
   }
 
   @Override
-  public synchronized void updateLeader(long timestamp, int dataNodeId) {
-    if (timestamp > this.timestamp) {
-      this.timestamp = timestamp;
-      this.leaderDataNodeId.set(dataNodeId);
+  public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample) {
+    slidingWindow.putIfAbsent(newHeartbeatSample.getBelongedDataNodeId(), new LinkedList<>());
+    synchronized (slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId())) {
+      LinkedList<RegionHeartbeatSample> samples =
+          slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId());
+
+      // Only sequential HeartbeatSamples are accepted.
+      // And un-sequential HeartbeatSamples will be discarded.
+      if (samples.size() == 0
+          || samples.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+        samples.add(newHeartbeatSample);
+      }
+
+      if (samples.size() > maximumWindowSize) {
+        samples.removeFirst();
+      }
+    }
+  }
+
+  @Override
+  public boolean updateLoadStatistic() {
+    long updateVersion = Long.MIN_VALUE;
+    int updateLeaderDataNodeId = -1;
+
+    synchronized (slidingWindow) {
+      for (LinkedList<RegionHeartbeatSample> samples : slidingWindow.values()) {
+        if (samples.size() > 0) {
+          RegionHeartbeatSample lastSample = samples.getLast();
+          if (lastSample.getSendTimestamp() > updateVersion && lastSample.isLeader()) {
+            updateVersion = lastSample.getSendTimestamp();
+            updateLeaderDataNodeId = lastSample.getBelongedDataNodeId();
+          }
+        }
+      }
+    }
+
+    if (updateVersion > versionTimestamp.get()) {
+      versionTimestamp.set(updateVersion);
+      leaderDataNodeId.set(updateLeaderDataNodeId);
+      return true;
     }
+    return false;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
similarity index 71%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
index 66327611aa..8631c2edbf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
@@ -18,15 +18,24 @@
  */
 package org.apache.iotdb.confignode.manager.load.heartbeat;
 
-public class HeartbeatPackage {
+public class RegionHeartbeatSample {
 
   // Unit: ms
   private final long sendTimestamp;
   private final long receiveTimestamp;
 
-  public HeartbeatPackage(long sendTimestamp, long receiveTimestamp) {
+  private final int belongedDataNodeId;
+  private final boolean isLeader;
+
+  // TODO: Add load sample
+
+  public RegionHeartbeatSample(
+      long sendTimestamp, long receiveTimestamp, int belongedDataNodeId, boolean isLeader) {
     this.sendTimestamp = sendTimestamp;
     this.receiveTimestamp = receiveTimestamp;
+
+    this.belongedDataNodeId = belongedDataNodeId;
+    this.isLeader = isLeader;
   }
 
   public long getSendTimestamp() {
@@ -36,4 +45,12 @@ public class HeartbeatPackage {
   public long getReceiveTimestamp() {
     return receiveTimestamp;
   }
+
+  public int getBelongedDataNodeId() {
+    return belongedDataNodeId;
+  }
+
+  public boolean isLeader() {
+    return isLeader;
+  }
 }