You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2023/04/19 03:01:41 UTC

[iotdb] branch Construct-Cluster-LoadPublisher-Thread-and-IClusterStatusSubscriber created (now 16a3e4d5dd)

This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a change to branch Construct-Cluster-LoadPublisher-Thread-and-IClusterStatusSubscriber
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 16a3e4d5dd Finish

This branch includes the following new commits:

     new 16a3e4d5dd Finish

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Finish

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch Construct-Cluster-LoadPublisher-Thread-and-IClusterStatusSubscriber
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16a3e4d5dd78ee6cf06791c690c0725f46c4f092
Author: YongzaoDan <53...@qq.com>
AuthorDate: Wed Apr 19 11:01:30 2023 +0800

    Finish
---
 .../confignode/manager/ClusterSchemaManager.java   |  15 ---
 .../iotdb/confignode/manager/load/LoadManager.java |   8 +-
 .../manager/load/balancer/RouteBalancer.java       |  19 ++--
 .../load/balancer/router/RegionRouteMap.java       |   5 +
 .../confignode/manager/load/cache/LoadCache.java   |  15 ++-
 .../load/cache/region/RegionGroupCache.java        |   4 +
 .../manager/load/service/StatisticsService.java    | 115 ++++++++++++---------
 .../subscriber/IClusterStatusSubscriber.java}      |  17 ++-
 .../manager/load/subscriber/RouteChangeEvent.java  |  74 +++++++++++++
 .../subscriber/StatisticsChangeEvent.java}         |  26 +++--
 10 files changed, 210 insertions(+), 88 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 1ac6823635..5bc25f5d71 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoRe
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
@@ -81,8 +80,6 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -806,18 +803,6 @@ public class ClusterSchemaManager {
     return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)).getStatus();
   }
 
-  /**
-   * When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
-   * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
-   *
-   * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop
-   */
-  @Subscribe
-  @AllowConcurrentEvents
-  public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
-    // TODO
-  }
-
   private NodeManager getNodeManager() {
     return configManager.getNodeManager();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 2aaa730a8a..b6f635a11b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -72,8 +72,8 @@ public class LoadManager {
   private final HeartbeatService heartbeatService;
   private final StatisticsService statisticsService;
 
-  private final EventBus eventBus =
-      new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));
+  private final EventBus loadPublisher =
+      new AsyncEventBus("Cluster-LoadPublisher-Thread", Executors.newFixedThreadPool(5));
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
@@ -85,9 +85,9 @@ public class LoadManager {
     this.loadCache = new LoadCache();
     this.heartbeatService = new HeartbeatService(configManager, loadCache);
     this.statisticsService =
-        new StatisticsService(configManager, routeBalancer, loadCache, eventBus);
+        new StatisticsService(configManager, routeBalancer, loadCache, loadPublisher);
 
-    eventBus.register(configManager.getClusterSchemaManager());
+    loadPublisher.register(statisticsService);
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index a87cb051ba..31ea24a50f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFl
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -165,15 +166,18 @@ public class RouteBalancer {
   /**
    * Invoking periodically to update the RegionRouteMap
    *
-   * @return True if the RegionRouteMap has changed, false otherwise
+   * @return RouteChangeEvent
    */
-  public boolean updateRegionRouteMap() {
+  public RouteChangeEvent updateRegionRouteMap() {
     synchronized (regionRouteMap) {
-      return updateRegionLeaderMap() | updateRegionPriorityMap();
+      RegionRouteMap preRouteMap = new RegionRouteMap(regionRouteMap);
+      updateRegionLeaderMap();
+      updateRegionPriorityMap();
+      return new RouteChangeEvent(preRouteMap, regionRouteMap);
     }
   }
 
-  private boolean updateRegionLeaderMap() {
+  private void updateRegionLeaderMap() {
     AtomicBoolean isLeaderChanged = new AtomicBoolean(false);
     leaderCache.forEach(
         (regionGroupId, leadershipSample) -> {
@@ -189,10 +193,10 @@ public class RouteBalancer {
             isLeaderChanged.set(true);
           }
         });
-    return isLeaderChanged.get();
+    isLeaderChanged.get();
   }
 
-  private boolean updateRegionPriorityMap() {
+  private void updateRegionPriorityMap() {
     Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap();
     Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores();
 
@@ -211,9 +215,6 @@ public class RouteBalancer {
 
     if (!latestRegionPriorityMap.equals(regionRouteMap.getRegionPriorityMap())) {
       regionRouteMap.setRegionPriorityMap(latestRegionPriorityMap);
-      return true;
-    } else {
-      return false;
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
index 58f456ab8f..cbe9003355 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
@@ -50,6 +50,11 @@ public class RegionRouteMap {
     this.regionPriorityMap = new ConcurrentHashMap<>();
   }
 
+  public RegionRouteMap(RegionRouteMap other) {
+    this.regionLeaderMap = new ConcurrentHashMap<>(other.regionLeaderMap);
+    this.regionPriorityMap = new ConcurrentHashMap<>(other.regionPriorityMap);
+  }
+
   /**
    * @return DataNodeId where the specified RegionGroup's leader resides. And return -1 if the
    *     leader is not recorded yet
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 0b2fd7195c..5a487de11c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -165,7 +165,7 @@ public class LoadCache {
           if (nodeCache.periodicUpdate()) {
             // Update and record the changed NodeStatistics
             differentNodeStatisticsMap.put(
-                nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
+                nodeId, new Pair<>(preNodeStatistics, nodeCache.getStatistics()));
           }
         });
     return differentNodeStatisticsMap;
@@ -176,14 +176,19 @@ public class LoadCache {
    *
    * @return a map of changed RegionGroupStatistics
    */
-  public Map<TConsensusGroupId, RegionGroupStatistics> updateRegionGroupStatistics() {
-    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
-        new ConcurrentHashMap<>();
+  public Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+      updateRegionGroupStatistics() {
+    Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+        differentRegionGroupStatisticsMap = new ConcurrentHashMap<>();
     regionGroupCacheMap.forEach(
         (regionGroupId, regionGroupCache) -> {
+          RegionGroupStatistics preRegionGroupStatistics =
+              regionGroupCache.getPreviousStatistics().deepCopy();
           if (regionGroupCache.periodicUpdate()) {
             // Update and record the changed RegionGroupStatistics
-            differentRegionGroupStatisticsMap.put(regionGroupId, regionGroupCache.getStatistics());
+            differentRegionGroupStatisticsMap.put(
+                regionGroupId,
+                new Pair<>(preRegionGroupStatistics, regionGroupCache.getStatistics()));
           }
         });
     return differentRegionGroupStatisticsMap;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index a531e440aa..dd21c24721 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -163,4 +163,8 @@ public class RegionGroupCache {
   public RegionGroupStatistics getStatistics() {
     return currentStatistics.get();
   }
+
+  public RegionGroupStatistics getPreviousStatistics() {
+    return previousStatistics.get();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
index 9e51b153de..bdbb24cc8e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
@@ -32,12 +32,13 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
 import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
+import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
+import org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -52,7 +53,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-public class StatisticsService {
+public class StatisticsService implements IClusterStatusSubscriber {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class);
 
@@ -111,28 +112,32 @@ public class StatisticsService {
     boolean isNeedBroadcast = false;
 
     // Update NodeStatistics:
-    // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
-    // means the previous NodeStatistics
+    // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>>
     Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
         loadCache.updateNodeStatistics();
     if (!differentNodeStatisticsMap.isEmpty()) {
       isNeedBroadcast = true;
-      recordNodeStatistics(differentNodeStatisticsMap);
-      eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
     }
 
     // Update RegionGroupStatistics
-    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
-        loadCache.updateRegionGroupStatistics();
+    // Map<RegionGroupId, Pair<old RegionGroupStatistics, new RegionGroupStatistics>>
+    Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+        differentRegionGroupStatisticsMap = loadCache.updateRegionGroupStatistics();
     if (!differentRegionGroupStatisticsMap.isEmpty()) {
       isNeedBroadcast = true;
-      recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
+    }
+
+    if (isNeedBroadcast) {
+      StatisticsChangeEvent statisticsChangeEvent =
+          new StatisticsChangeEvent(differentNodeStatisticsMap, differentRegionGroupStatisticsMap);
+      eventBus.post(statisticsChangeEvent);
     }
 
     // Update RegionRouteMap
-    if (routeBalancer.updateRegionRouteMap()) {
+    RouteChangeEvent routeChangeEvent = routeBalancer.updateRegionRouteMap();
+    if (routeChangeEvent.isNeedBroadcast()) {
       isNeedBroadcast = true;
-      recordRegionRouteMap(routeBalancer.getRegionRouteMap());
+      eventBus.post(routeChangeEvent);
     }
 
     if (isNeedBroadcast) {
@@ -140,6 +145,31 @@ public class StatisticsService {
     }
   }
 
+  public void broadcastLatestRegionRouteMap() {
+    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
+        routeBalancer.getLatestRegionPriorityMap();
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
+    configManager
+        .getNodeManager()
+        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
+        .forEach(
+            onlineDataNode ->
+                dataNodeLocationMap.put(
+                    onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
+
+    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
+    long broadcastTime = System.currentTimeMillis();
+
+    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+            dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+  }
+
   private void recordNodeStatistics(
       Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
     LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
@@ -148,19 +178,20 @@ public class StatisticsService {
       LOGGER.info(
           "[UpdateLoadStatistics]\t {}={}",
           "nodeId{" + nodeCacheEntry.getKey() + "}",
-          nodeCacheEntry.getValue().left);
+          nodeCacheEntry.getValue().getRight());
     }
   }
 
   private void recordRegionGroupStatistics(
-      Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
+      Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+          differentRegionGroupStatisticsMap) {
     LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
-    for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
-        differentRegionGroupStatisticsMap.entrySet()) {
+    for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+        regionGroupStatisticsEntry : differentRegionGroupStatisticsMap.entrySet()) {
       LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
       LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
       for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
-          regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
+          regionGroupStatisticsEntry.getValue().getRight().getRegionStatisticsMap().entrySet()) {
         LOGGER.info(
             "[UpdateLoadStatistics]\t dataNodeId{}={}",
             regionStatisticsEntry.getKey(),
@@ -169,50 +200,40 @@ public class StatisticsService {
     }
   }
 
-  private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
+  @Override
+  public void onClusterStatisticsChanged(StatisticsChangeEvent event) {
+    recordNodeStatistics(event.getNodeStatisticsMap());
+    recordRegionGroupStatistics(event.getRegionGroupStatisticsMap());
+  }
+
+  private void recordRegionLeaderMap(Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) {
     LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
-    for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
-        regionRouteMap.getRegionLeaderMap().entrySet()) {
+    for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> regionLeaderEntry :
+        leaderMap.entrySet()) {
       LOGGER.info(
           "[UpdateLoadStatistics]\t {}={}",
           regionLeaderEntry.getKey(),
-          regionLeaderEntry.getValue());
+          regionLeaderEntry.getValue().getRight());
     }
+  }
 
+  private void recordRegionPriorityMap(
+      Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> priorityMap) {
     LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
-    for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
-        regionRouteMap.getRegionPriorityMap().entrySet()) {
+    for (Map.Entry<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
+        regionPriorityEntry : priorityMap.entrySet()) {
       LOGGER.info(
           "[UpdateLoadStatistics]\t {}={}",
           regionPriorityEntry.getKey(),
-          regionPriorityEntry.getValue().getDataNodeLocations().stream()
+          regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
               .map(TDataNodeLocation::getDataNodeId)
               .collect(Collectors.toList()));
     }
   }
 
-  public void broadcastLatestRegionRouteMap() {
-    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
-        routeBalancer.getLatestRegionPriorityMap();
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
-    configManager
-        .getNodeManager()
-        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
-        .forEach(
-            onlineDataNode ->
-                dataNodeLocationMap.put(
-                    onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
-
-    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
-    long broadcastTime = System.currentTimeMillis();
-
-    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(
-            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
-            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
-            dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+  @Override
+  public void onRegionGroupLeaderChanged(RouteChangeEvent event) {
+    recordRegionLeaderMap(event.getLeaderMap());
+    recordRegionPriorityMap(event.getPriorityMap());
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
similarity index 66%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
index d9e8445e74..faa79fb5a5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
@@ -16,6 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.observer;
 
-public interface IEvent {}
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+
+public interface IClusterStatusSubscriber {
+
+  @Subscribe
+  @AllowConcurrentEvents
+  void onClusterStatisticsChanged(StatisticsChangeEvent event);
+
+  @Subscribe
+  @AllowConcurrentEvents
+  void onRegionGroupLeaderChanged(RouteChangeEvent event);
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
new file mode 100644
index 0000000000..55153f3bf5
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RouteChangeEvent {
+
+  // Map<RegionGroupId, Pair<old Leader, new Leader>>
+  private final Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap;
+  // Map<RegionGroupId, Pair<old Priority, new Priority>>
+  private final Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> priorityMap;
+
+  public RouteChangeEvent(RegionRouteMap preRouteMap, RegionRouteMap currentRouteMap) {
+    this.leaderMap = new ConcurrentHashMap<>();
+    this.priorityMap = new ConcurrentHashMap<>();
+
+    preRouteMap
+        .getRegionLeaderMap()
+        .forEach(
+            (regionGroupId, oldLeader) -> {
+              Integer newLeader = currentRouteMap.getRegionLeaderMap().get(regionGroupId);
+              if (newLeader != null && !newLeader.equals(oldLeader)) {
+                leaderMap.put(regionGroupId, new Pair<>(oldLeader, newLeader));
+              }
+            });
+
+    preRouteMap
+        .getRegionPriorityMap()
+        .forEach(
+            (regionGroupId, oldPriority) -> {
+              TRegionReplicaSet newPriority =
+                  currentRouteMap.getRegionPriorityMap().get(regionGroupId);
+              if (newPriority != null && !newPriority.equals(oldPriority)) {
+                priorityMap.put(regionGroupId, new Pair<>(oldPriority, newPriority));
+              }
+            });
+  }
+
+  public boolean isNeedBroadcast() {
+    return !leaderMap.isEmpty() || !priorityMap.isEmpty();
+  }
+
+  public Map<TConsensusGroupId, Pair<Integer, Integer>> getLeaderMap() {
+    return leaderMap;
+  }
+
+  public Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> getPriorityMap() {
+    return priorityMap;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
similarity index 51%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
index 50e9023040..c8ba7b6248 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
@@ -16,24 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.observer;
 
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.Map;
 
-public class NodeStatisticsEvent implements IEvent {
+public class StatisticsChangeEvent {
 
-  // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one means
-  // the previous NodeStatistics
-  private Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap;
+  // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>>
+  private final Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap;
+  // Map<RegionGroupId, Pair<old RegionGroupStatistics, new RegionGroupStatistics>>
+  private final Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+      regionGroupStatisticsMap;
 
-  public NodeStatisticsEvent(Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap) {
+  public StatisticsChangeEvent(
+      Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap,
+      Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+          regionGroupStatisticsMap) {
     this.nodeStatisticsMap = nodeStatisticsMap;
+    this.regionGroupStatisticsMap = regionGroupStatisticsMap;
   }
 
   public Map<Integer, Pair<NodeStatistics, NodeStatistics>> getNodeStatisticsMap() {
     return nodeStatisticsMap;
   }
+
+  public Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+      getRegionGroupStatisticsMap() {
+    return regionGroupStatisticsMap;
+  }
 }