You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2023/04/13 14:05:31 UTC
[iotdb] 01/03: stash for classes move
This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch Move-heartbeat-thread-and-statistics-thread-to-LoadManager
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 58a2e633a6bec7af2fe41c8622e61c4be74b9131
Author: YongzaoDan <53...@qq.com>
AuthorDate: Wed Apr 12 11:09:21 2023 +0800
stash for classes move
---
.../heartbeat/ConfigNodeHeartbeatHandler.java | 18 +-
.../heartbeat/DataNodeHeartbeatHandler.java | 58 +++----
.../statemachine/ConfigRegionStateMachine.java | 4 +-
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../iotdb/confignode/manager/ProcedureManager.java | 2 +-
.../confignode/manager/RetryFailedTasksThread.java | 4 +-
.../iotdb/confignode/manager/load/LoadManager.java | 42 +++--
.../manager/load/balancer/RouteBalancer.java | 30 +---
.../load/heartbeat/HeartbeatSampleCache.java | 123 +++++++++++++
.../manager/load/heartbeat/HeartbeatService.java | 190 +++++++++++++++++++++
.../heartbeat/node}/BaseNodeCache.java | 3 +-
.../heartbeat/node}/ConfigNodeHeartbeatCache.java | 3 +-
.../heartbeat/node}/DataNodeHeartbeatCache.java | 3 +-
.../heartbeat/node}/NodeHeartbeatSample.java | 2 +-
.../heartbeat/region}/RegionCache.java | 7 +-
.../heartbeat/region}/RegionGroupCache.java | 4 +-
.../heartbeat/region}/RegionHeartbeatSample.java | 2 +-
.../statistics}/NodeStatistics.java | 3 +-
.../statistics}/RegionGroupStatistics.java | 2 +-
.../statistics}/RegionStatistics.java | 3 +-
.../statistics/StatisticsService.java} | 30 +---
.../iotdb/confignode/manager/node/NodeManager.java | 146 +---------------
.../manager/observer/NodeStatisticsEvent.java | 2 +-
.../manager/partition/PartitionManager.java | 10 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 6 +-
.../procedure/env/DataNodeRemoveHandler.java | 2 +-
.../router/priority/GreedyPriorityTest.java | 6 +-
.../priority/LeaderPriorityBalancerTest.java | 6 +-
.../confignode/manager/node/NodeCacheTest.java | 4 +-
.../manager/partition/RegionGroupCacheTest.java | 4 +-
.../persistence/node/NodeStatisticsTest.java | 2 +-
.../statistics/RegionGroupStatisticsTest.java | 4 +-
.../partition/statistics/RegionStatisticsTest.java | 2 +-
33 files changed, 427 insertions(+), 302 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index c97f3d8bb4..92881210e7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -18,24 +18,26 @@
*/
package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
import org.apache.thrift.async.AsyncMethodCallback;
public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
- // Update ConfigNodeHeartbeatCache when success
- private final ConfigNodeHeartbeatCache configNodeHeartbeatCache;
+ private final int nodeId;
+ private final HeartbeatSampleCache cache;
- public ConfigNodeHeartbeatHandler(ConfigNodeHeartbeatCache configNodeHeartbeatCache) {
- this.configNodeHeartbeatCache = configNodeHeartbeatCache;
+ public ConfigNodeHeartbeatHandler(int nodeId, HeartbeatSampleCache cache) {
+ this.nodeId = nodeId;
+ this.cache = cache;
}
@Override
public void onComplete(Long timestamp) {
- configNodeHeartbeatCache.cacheHeartbeatSample(
- new NodeHeartbeatSample(timestamp, System.currentTimeMillis()));
+ long receiveTime = System.currentTimeMillis();
+ cache.cacheConfigNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(timestamp, receiveTime));
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index fe1af63540..bea4455313 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatSampleCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -35,27 +35,23 @@ import java.util.Map;
public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
- // Update DataNodeHeartbeatCache when success
- private final TDataNodeLocation dataNodeLocation;
- private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
- private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
- private final RouteBalancer routeBalancer;
+ private final int nodeId;
+
+ private final HeartbeatSampleCache heartbeatSampleCache;
+
private final Map<Integer, Long> deviceNum;
private final Map<Integer, Long> timeSeriesNum;
private final Map<Integer, Long> regionDisk;
public DataNodeHeartbeatHandler(
- TDataNodeLocation dataNodeLocation,
- DataNodeHeartbeatCache dataNodeHeartbeatCache,
- Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
- RouteBalancer routeBalancer,
+ int nodeId,
+ HeartbeatSampleCache heartbeatSampleCache,
Map<Integer, Long> deviceNum,
Map<Integer, Long> timeSeriesNum,
Map<Integer, Long> regionDisk) {
- this.dataNodeLocation = dataNodeLocation;
- this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
- this.regionGroupCacheMap = regionGroupCacheMap;
- this.routeBalancer = routeBalancer;
+
+ this.nodeId = nodeId;
+ this.heartbeatSampleCache = heartbeatSampleCache;
this.deviceNum = deviceNum;
this.timeSeriesNum = timeSeriesNum;
this.regionDisk = regionDisk;
@@ -66,31 +62,29 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
long receiveTime = System.currentTimeMillis();
// Update NodeCache
- dataNodeHeartbeatCache.cacheHeartbeatSample(
- new NodeHeartbeatSample(heartbeatResp, receiveTime));
+ heartbeatSampleCache.cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime));
- // Update RegionGroupCache And leaderCache
heartbeatResp
.getJudgedLeaders()
.forEach(
(regionGroupId, isLeader) -> {
- regionGroupCacheMap
- .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
- .cacheHeartbeatSample(
- dataNodeLocation.getDataNodeId(),
- new RegionHeartbeatSample(
- heartbeatResp.getHeartbeatTimestamp(),
- receiveTime,
- // Region will inherit DataNode's status
- RegionStatus.parse(heartbeatResp.getStatus())));
+ // Update RegionGroupCache
+ heartbeatSampleCache.cacheRegionHeartbeatSample(regionGroupId, nodeId,
+ new RegionHeartbeatSample(
+ heartbeatResp.getHeartbeatTimestamp(),
+ receiveTime,
+ // Region will inherit DataNode's status
+ RegionStatus.parse(heartbeatResp.getStatus())));
if (isLeader) {
- routeBalancer.cacheLeaderSample(
+ // Update leaderCache
+ heartbeatSampleCache.cacheLeaderSample(
regionGroupId,
new Pair<>(
- heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId()));
+ heartbeatResp.getHeartbeatTimestamp(), nodeId));
}
});
+
if (heartbeatResp.getDeviceNum() != null) {
deviceNum.putAll(heartbeatResp.getDeviceNum());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 9e93061cf3..78282c4fa3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -209,10 +209,10 @@ public class ConfigRegionStateMachine
// Start leader scheduling services
configManager.getProcedureManager().shiftExecutor(true);
+ configManager.getLoadManager().startHeartbeatService();
configManager.getLoadManager().startLoadStatisticsService();
configManager.getLoadManager().getRouteBalancer().startRouteBalancingService();
configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
- configManager.getNodeManager().startHeartbeatService();
configManager.getPartitionManager().startRegionCleaner();
// we do cq recovery async for two reasons:
@@ -230,10 +230,10 @@ public class ConfigRegionStateMachine
// Stop leader scheduling services
configManager.getProcedureManager().shiftExecutor(false);
+ configManager.getLoadManager().stopHeartbeatService();
configManager.getLoadManager().stopLoadStatisticsService();
configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService();
configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
- configManager.getNodeManager().stopHeartbeatService();
configManager.getPartitionManager().stopRegionCleaner();
configManager.getCQManager().stopCQScheduler();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 96ea55f134..cf038e6dba 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -88,7 +88,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.node.NodeMetrics;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 63a78d36c1..f9ac0230fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -41,7 +41,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 97666a005b..3dab05219d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -55,6 +55,8 @@ import java.util.concurrent.TimeUnit;
*/
public class RetryFailedTasksThread {
+ // TODO: Replace this class by cluster events
+
private static final Logger LOGGER = LoggerFactory.getLogger(RetryFailedTasksThread.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index a19c942d25..4b18d37a0a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -32,23 +33,24 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatService;
+import org.apache.iotdb.confignode.manager.load.statistics.StatisticsService;
import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -76,9 +78,6 @@ public class LoadManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
- private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
- private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
-
private final IManager configManager;
/** Balancers */
@@ -87,12 +86,16 @@ public class LoadManager {
private final PartitionBalancer partitionBalancer;
private final RouteBalancer routeBalancer;
+ /** Cluster load services */
+ private final HeartbeatService heartbeatService;
+ private final StatisticsService statisticsService;
+
+
/** Load statistics executor service */
+ private final Object statisticsScheduleMonitor = new Object();
private Future<?> currentLoadStatisticsFuture;
-
private final ScheduledExecutorService loadStatisticsExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
- private final Object scheduleMonitor = new Object();
private final EventBus eventBus =
new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));
@@ -104,6 +107,9 @@ public class LoadManager {
this.partitionBalancer = new PartitionBalancer(configManager);
this.routeBalancer = new RouteBalancer(configManager);
+ this.heartbeatService = new HeartbeatService(configManager);
+ this.statisticsService = new StatisticsService(configManager);
+
eventBus.register(configManager.getClusterSchemaManager());
eventBus.register(configManager.getSyncManager());
}
@@ -118,8 +124,8 @@ public class LoadManager {
* @throws DatabaseNotExistsException If some specific StorageGroups don't exist
*/
public CreateRegionGroupsPlan allocateRegionGroups(
- Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
- throws NotEnoughDataNodeException, DatabaseNotExistsException {
+ Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
+ throws NotEnoughDataNodeException, DatabaseNotExistsException {
return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType);
}
@@ -130,8 +136,8 @@ public class LoadManager {
* @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
*/
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
- throws NoAvailableRegionGroupException {
+ Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+ throws NoAvailableRegionGroupException {
return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
}
@@ -142,8 +148,8 @@ public class LoadManager {
* @return Map<StorageGroupName, DataPartitionTable>, the allocating result
*/
public Map<String, DataPartitionTable> allocateDataPartition(
- Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
- throws NoAvailableRegionGroupException {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
+ throws NoAvailableRegionGroupException {
return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
}
@@ -185,7 +191,7 @@ public class LoadManager {
/** Start the load statistics service */
public void startLoadStatisticsService() {
- synchronized (scheduleMonitor) {
+ synchronized (heartbeatScheduleMonitor) {
if (currentLoadStatisticsFuture == null) {
currentLoadStatisticsFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
@@ -201,7 +207,7 @@ public class LoadManager {
/** Stop the load statistics service */
public void stopLoadStatisticsService() {
- synchronized (scheduleMonitor) {
+ synchronized (heartbeatScheduleMonitor) {
if (currentLoadStatisticsFuture != null) {
currentLoadStatisticsFuture.cancel(false);
currentLoadStatisticsFuture = null;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 31f28397fb..025d29e6fe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -93,12 +93,6 @@ public class RouteBalancer {
private final IManager configManager;
- // Key: RegionGroupId
- // Value: Pair<Timestamp, LeaderDataNodeId>, where
- // the left value stands for sampling timestamp
- // and the right value stands for the index of DataNode that leader resides.
- private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
-
/** RegionRouteMap */
private final RegionRouteMap regionRouteMap;
// For generating optimal RegionLeaderMap
@@ -107,6 +101,7 @@ public class RouteBalancer {
private final IPriorityBalancer priorityRouter;
/** Leader Balancing service */
+ // TODO: leader balancing should be triggered by cluster events
private Future<?> currentLeaderBalancingFuture;
private final ScheduledExecutorService leaderBalancingExecutor =
@@ -115,8 +110,6 @@ public class RouteBalancer {
public RouteBalancer(IManager configManager) {
this.configManager = configManager;
-
- this.leaderCache = new ConcurrentHashMap<>();
this.regionRouteMap = new RegionRouteMap();
switch (CONF.getLeaderDistributionPolicy()) {
@@ -140,27 +133,6 @@ public class RouteBalancer {
}
}
- /**
- * Cache the newest leaderHeartbeatSample
- *
- * @param regionGroupId Corresponding RegionGroup's index
- * @param leaderSample <Sample timestamp, leaderDataNodeId>, The newest HeartbeatSample
- */
- public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
- if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
- && IS_DATA_REGION_IOT_CONSENSUS) {
- // The leadership of IoTConsensus protocol is decided by ConfigNode-leader
- return;
- }
-
- leaderCache.putIfAbsent(regionGroupId, leaderSample);
- synchronized (leaderCache.get(regionGroupId)) {
- if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) {
- leaderCache.replace(regionGroupId, leaderSample);
- }
- }
- }
-
/**
* Invoking periodically to update the RegionRouteMap
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java
new file mode 100644
index 0000000000..4fcb27d00d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatSampleCache.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Maintain all kinds of heartbeat samples */
+public class HeartbeatSampleCache {
+
+ private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
+ ConsensusFactory.IOT_CONSENSUS.equals(ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass());
+
+ // Map<NodeId, INodeCache>
+ private final Map<Integer, BaseNodeCache> nodeCacheMap;
+ // Map<RegionGroupId, RegionGroupCache>
+ private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
+ // Key: RegionGroupId
+ // Value: Pair<Timestamp, LeaderDataNodeId>, where
+ // the left value stands for sampling timestamp
+ // and the right value stands for the index of DataNode that leader resides.
+ private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
+
+ public HeartbeatSampleCache() {
+ this.nodeCacheMap = new ConcurrentHashMap<>();
+ this.regionGroupCacheMap = new ConcurrentHashMap<>();
+ this.leaderCache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Cache the latest heartbeat sample of a ConfigNode.
+ *
+ * @param nodeId the id of the ConfigNode
+ * @param sample the latest heartbeat sample
+ */
+ public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+ nodeCacheMap.computeIfAbsent(nodeId,
+ empty -> new ConfigNodeHeartbeatCache(nodeId))
+ .cacheHeartbeatSample(sample);
+ }
+
+ /**
+ * Cache the latest heartbeat sample of a DataNode.
+ *
+ * @param nodeId the id of the DataNode
+ * @param sample the latest heartbeat sample
+ */
+ public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+ nodeCacheMap.computeIfAbsent(nodeId,
+ empty -> new DataNodeHeartbeatCache())
+ .cacheHeartbeatSample(sample);
+ }
+
+ /**
+ * Cache the latest heartbeat sample of a RegionGroup.
+ *
+ * @param regionGroupId the id of the RegionGroup
+ * @param nodeId the id of the DataNode where specified Region resides
+ * @param sample the latest heartbeat sample
+ */
+ public void cacheRegionHeartbeatSample(TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) {
+ regionGroupCacheMap.computeIfAbsent(regionGroupId,
+ empty -> new RegionGroupCache(regionGroupId))
+ .cacheHeartbeatSample(nodeId, sample);
+ }
+
+ /**
+ * Cache the latest leader of a RegionGroup.
+ *
+ * @param regionGroupId the id of the RegionGroup
+ * @param leaderSample the latest leader of a RegionGroup
+ */
+ public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
+ if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
+ && IS_DATA_REGION_IOT_CONSENSUS) {
+ // The leadership of IoTConsensus protocol is decided by ConfigNode-leader
+ return;
+ }
+
+ leaderCache.putIfAbsent(regionGroupId, leaderSample);
+ synchronized (leaderCache.get(regionGroupId)) {
+ if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) {
+ leaderCache.replace(regionGroupId, leaderSample);
+ }
+ }
+ }
+
+ public void clear() {
+ nodeCacheMap.clear();
+ regionGroupCacheMap.clear();
+ leaderCache.clear();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
new file mode 100644
index 0000000000..056ecfd76c
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatService.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Maintain the Cluster-Heartbeat-Service */
+public class HeartbeatService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
+
+ private static final long HEARTBEAT_INTERVAL = ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+
+ private final IManager configManager;
+
+ /** Heartbeat executor service */
+ // Monitor for leadership change
+ private final Object heartbeatScheduleMonitor = new Object();
+ private Future<?> currentHeartbeatFuture;
+ private final ScheduledExecutorService heartBeatExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
+ private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+
+ private final HeartbeatSampleCache heartbeatSampleCache;
+
+ public HeartbeatService(IManager configManager) {
+ this.configManager = configManager;
+ this.heartbeatSampleCache = new HeartbeatSampleCache();
+ }
+
+ /** Start the heartbeat service */
+ public void startHeartbeatService() {
+ synchronized (heartbeatScheduleMonitor) {
+ if (currentHeartbeatFuture == null) {
+ currentHeartbeatFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ heartBeatExecutor,
+ this::heartbeatLoopBody,
+ 0,
+ HEARTBEAT_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Heartbeat service is started successfully.");
+ }
+ }
+ }
+
+ /** Stop the heartbeat service */
+ public void stopHeartbeatService() {
+ synchronized (heartbeatScheduleMonitor) {
+ if (currentHeartbeatFuture != null) {
+ currentHeartbeatFuture.cancel(false);
+ currentHeartbeatFuture = null;
+ heartbeatSampleCache.clear();
+ LOGGER.info("Heartbeat service is stopped successfully.");
+ }
+ }
+ }
+
+ private THeartbeatReq genHeartbeatReq() {
+ /* Generate heartbeat request */
+ THeartbeatReq heartbeatReq = new THeartbeatReq();
+ heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+ // Always sample RegionGroups' leadership as the Region heartbeat
+ heartbeatReq.setNeedJudgeLeader(true);
+ // We sample DataNode's load in every 10 heartbeat loop
+ heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+
+ /* Update heartbeat counter */
+ heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+ if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
+ heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
+ heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
+ heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
+ }
+ return heartbeatReq;
+ }
+
+ /** loop body of the heartbeat thread */
+ private void heartbeatLoopBody() {
+ // The consensusManager of configManager may not be fully initialized at this time
+ Optional.ofNullable(getConsensusManager())
+ .ifPresent(
+ consensusManager -> {
+ if (getConsensusManager().isLeader()) {
+ // Generate HeartbeatReq
+ THeartbeatReq heartbeatReq = genHeartbeatReq();
+ // Send heartbeat requests to all the registered ConfigNodes
+ pingRegisteredConfigNodes(heartbeatReq, getNodeManager().getRegisteredConfigNodes());
+ // Send heartbeat requests to all the registered DataNodes
+ pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
+ }
+ });
+ }
+
+ /**
+ * Send heartbeat requests to all the Registered ConfigNodes
+ *
+ * @param registeredConfigNodes ConfigNodes that registered in cluster
+ */
+ private void pingRegisteredConfigNodes(
+ THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
+ // Send heartbeat requests
+ for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+ if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
+ // Skip itself
+ continue;
+ }
+
+ ConfigNodeHeartbeatHandler handler =
+ new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), heartbeatSampleCache);
+ AsyncConfigNodeHeartbeatClientPool.getInstance()
+ .getConfigNodeHeartBeat(
+ configNodeLocation.getInternalEndPoint(),
+ heartbeatReq.getHeartbeatTimestamp(),
+ handler);
+ }
+ }
+
+ /**
+ * Send heartbeat requests to all the Registered DataNodes
+ *
+ * @param registeredDataNodes DataNodes that registered in cluster
+ */
+ private void pingRegisteredDataNodes(
+ THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
+ // Send heartbeat requests
+ for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
+ DataNodeHeartbeatHandler handler =
+ new DataNodeHeartbeatHandler(
+ dataNodeInfo.getLocation().getDataNodeId(),
+ heartbeatSampleCache,
+ configManager.getClusterQuotaManager().getDeviceNum(),
+ configManager.getClusterQuotaManager().getTimeSeriesNum(),
+ configManager.getClusterQuotaManager().getRegionDisk());
+ configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
+ AsyncDataNodeHeartbeatClientPool.getInstance()
+ .getDataNodeHeartBeat(
+ dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+ }
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configManager.getConsensusManager();
+ }
+
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
index 24f9a9edb5..392ae24b6e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/BaseNodeCache.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
import java.util.LinkedList;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
index 17fc9b1eb2..afd9d3e197 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/ConfigNodeHeartbeatCache.java
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
public class ConfigNodeHeartbeatCache extends BaseNodeCache {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
index 5754d27320..cb20836120 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/DataNodeHeartbeatCache.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java
index dceff727bc..b3857c11f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/node/NodeHeartbeatSample.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java
index 706b40e3e8..6aff8394e9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionCache.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
+import static org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
public class RegionCache {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
similarity index 96%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
index 7e490670a9..fb5cf7766f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionGroupCache.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.region;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
import java.util.HashMap;
import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java
similarity index 95%
copy from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java
index 8de58a5e93..88e94e4be2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/region/RegionHeartbeatSample.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.heartbeat.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java
similarity index 96%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java
index 627ac00a33..a0a8645209 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/NodeStatistics.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+package org.apache.iotdb.confignode.manager.load.statistics;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
index d36175bc67..d22ea0e4d6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionGroupStatistics.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java
index d30bf43d96..87bdf39f91 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/RegionStatistics.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+package org.apache.iotdb.confignode.manager.load.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
similarity index 53%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
index 8de58a5e93..79b8ad1869 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/statistics/StatisticsService.java
@@ -16,34 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
-import org.apache.iotdb.commons.cluster.RegionStatus;
+package org.apache.iotdb.confignode.manager.load.statistics;
-public class RegionHeartbeatSample {
+public class StatisticsService {
- // Unit: ms
- private final long sendTimestamp;
- private final long receiveTimestamp;
- private final RegionStatus status;
-
- // TODO: Add load sample
-
- public RegionHeartbeatSample(long sendTimestamp, long receiveTimestamp, RegionStatus status) {
- this.sendTimestamp = sendTimestamp;
- this.receiveTimestamp = receiveTimestamp;
- this.status = status;
- }
-
- public long getSendTimestamp() {
- return sendTimestamp;
- }
-
- public long getReceiveTimestamp() {
- return receiveTimestamp;
- }
-
- public RegionStatus getStatus() {
- return status;
- }
+
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 97125db05a..62ce597f2f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -27,18 +27,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -60,9 +54,9 @@ import org.apache.iotdb.confignode.manager.TriggerManager;
import org.apache.iotdb.confignode.manager.UDFManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -81,7 +75,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -98,9 +91,6 @@ import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -119,23 +109,12 @@ public class NodeManager {
private final ReentrantLock removeConfigNodeLock;
- /** Heartbeat executor service */
- // Monitor for leadership change
- private final Object scheduleMonitor = new Object();
- // Map<NodeId, INodeCache>
- private final Map<Integer, BaseNodeCache> nodeCacheMap;
- private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
- private Future<?> currentHeartbeatFuture;
- private final ScheduledExecutorService heartBeatExecutor =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
-
private final Random random;
public NodeManager(IManager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
this.removeConfigNodeLock = new ReentrantLock();
- this.nodeCacheMap = new ConcurrentHashMap<>();
this.random = new Random(System.currentTimeMillis());
}
@@ -687,125 +666,6 @@ public class NodeManager {
}
}
- /** Start the heartbeat service */
- public void startHeartbeatService() {
- synchronized (scheduleMonitor) {
- if (currentHeartbeatFuture == null) {
- currentHeartbeatFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- heartBeatExecutor,
- this::heartbeatLoopBody,
- 0,
- HEARTBEAT_INTERVAL,
- TimeUnit.MILLISECONDS);
- LOGGER.info("Heartbeat service is started successfully.");
- }
- }
- }
-
- /** loop body of the heartbeat thread */
- private void heartbeatLoopBody() {
- // The consensusManager of configManager may not be fully initialized at this time
- Optional.ofNullable(getConsensusManager())
- .ifPresent(
- consensusManager -> {
- if (getConsensusManager().isLeader()) {
- // Generate HeartbeatReq
- THeartbeatReq heartbeatReq = genHeartbeatReq();
- // Send heartbeat requests to all the registered DataNodes
- pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
- // Send heartbeat requests to all the registered ConfigNodes
- pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
- }
- });
- }
-
- private THeartbeatReq genHeartbeatReq() {
- /* Generate heartbeat request */
- THeartbeatReq heartbeatReq = new THeartbeatReq();
- heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
- // Always sample RegionGroups' leadership as the Region heartbeat
- heartbeatReq.setNeedJudgeLeader(true);
- // We sample DataNode's load in every 10 heartbeat loop
- heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
-
- /* Update heartbeat counter */
- heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
- if (!getClusterQuotaManager().hasSpaceQuotaLimit()) {
- heartbeatReq.setSchemaRegionIds(getClusterQuotaManager().getSchemaRegionIds());
- heartbeatReq.setDataRegionIds(getClusterQuotaManager().getDataRegionIds());
- heartbeatReq.setSpaceQuotaUsage(getClusterQuotaManager().getSpaceQuotaUsage());
- }
- return heartbeatReq;
- }
-
- /**
- * Send heartbeat requests to all the Registered DataNodes
- *
- * @param registeredDataNodes DataNodes that registered in cluster
- */
- private void pingRegisteredDataNodes(
- THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
- // Send heartbeat requests
- for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
- DataNodeHeartbeatHandler handler =
- new DataNodeHeartbeatHandler(
- dataNodeInfo.getLocation(),
- (DataNodeHeartbeatCache)
- nodeCacheMap.computeIfAbsent(
- dataNodeInfo.getLocation().getDataNodeId(),
- empty -> new DataNodeHeartbeatCache()),
- getPartitionManager().getRegionGroupCacheMap(),
- getLoadManager().getRouteBalancer(),
- getClusterQuotaManager().getDeviceNum(),
- getClusterQuotaManager().getTimeSeriesNum(),
- getClusterQuotaManager().getRegionDisk());
- getClusterQuotaManager().updateSpaceQuotaUsage();
- AsyncDataNodeHeartbeatClientPool.getInstance()
- .getDataNodeHeartBeat(
- dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
- }
- }
-
- /**
- * Send heartbeat requests to all the Registered ConfigNodes
- *
- * @param registeredConfigNodes ConfigNodes that registered in cluster
- */
- private void pingRegisteredConfigNodes(
- THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
- // Send heartbeat requests
- for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
- if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
- // Skip itself
- continue;
- }
-
- ConfigNodeHeartbeatHandler handler =
- new ConfigNodeHeartbeatHandler(
- (ConfigNodeHeartbeatCache)
- nodeCacheMap.computeIfAbsent(
- configNodeLocation.getConfigNodeId(),
- empty -> new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())));
- AsyncConfigNodeHeartbeatClientPool.getInstance()
- .getConfigNodeHeartBeat(
- configNodeLocation.getInternalEndPoint(),
- heartbeatReq.getHeartbeatTimestamp(),
- handler);
- }
- }
-
- /** Stop the heartbeat service */
- public void stopHeartbeatService() {
- synchronized (scheduleMonitor) {
- if (currentHeartbeatFuture != null) {
- currentHeartbeatFuture.cancel(false);
- currentHeartbeatFuture = null;
- nodeCacheMap.clear();
- LOGGER.info("Heartbeat service is stopped successfully.");
- }
- }
- }
public Map<Integer, BaseNodeCache> getNodeCacheMap() {
return nodeCacheMap;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
index a38adafd74..4c44fb07c1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.manager.observer;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 223bee15e4..a754ccf396 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -68,7 +68,7 @@ import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -128,15 +128,11 @@ public class PartitionManager {
private final ScheduledExecutorService regionMaintainer;
private Future<?> currentRegionMaintainerFuture;
- // Map<RegionId, RegionGroupCache>
- private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
-
public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
this.partitionInfo = partitionInfo;
this.regionMaintainer =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Maintainer");
- this.regionGroupCacheMap = new ConcurrentHashMap<>();
setSeriesPartitionExecutor();
}
@@ -1070,10 +1066,6 @@ public class PartitionManager {
}
}
- public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
- return regionGroupCacheMap;
- }
-
public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
regionGroupCacheMap.remove(consensusGroupId);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index efa53d330d..5faefbcd4f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -49,10 +49,10 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index afa2d3c543..bf1ce7e940 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index cb617bf027..6da0304f44 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index a028219b08..f1bf444427 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
index 4d93b6355b..b51cf10bcc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
index 3c3cf4881b..da91dc82dc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.confignode.manager.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.region.RegionHeartbeatSample;
import org.junit.Assert;
import org.junit.Test;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
index 4181fa9d67..766a5ed48f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.confignode.persistence.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.NodeStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
index cce3ed250a..b0b97b2b0c 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.confignode.persistence.partition.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
index 8ccf87c7c2..4ad69092ef 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.confignode.persistence.partition.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.statistics.RegionStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;