You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/30 12:52:54 UTC
[iotdb] branch master updated: [IOTDB-4796] Remove LoadStatistics consensus feature (#7799)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2e9fa3c91c [IOTDB-4796] Remove LoadStatistics consensus feature (#7799)
2e9fa3c91c is described below
commit 2e9fa3c91c99f27c30ed043ffe71f428910f4bd4
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Oct 30 20:52:50 2022 +0800
[IOTDB-4796] Remove LoadStatistics consensus feature (#7799)
---
.../heartbeat/ConfigNodeHeartbeatHandler.java | 4 +-
.../heartbeat/DataNodeHeartbeatHandler.java | 39 ++---
.../consensus/request/ConfigPhysicalPlan.java | 4 -
.../consensus/request/ConfigPhysicalPlanType.java | 5 +-
.../write/statistics/UpdateLoadStatisticsPlan.java | 161 ---------------------
.../statemachine/PartitionRegionStateMachine.java | 4 +-
.../iotdb/confignode/manager/load/LoadManager.java | 131 +++++++++++------
.../manager/load/balancer/RouteBalancer.java | 53 ++++---
.../load/balancer/router/RegionRouteMap.java | 14 +-
.../iotdb/confignode/manager/node/NodeManager.java | 95 +++---------
.../node/{ => heartbeat}/BaseNodeCache.java | 68 ++++++---
.../{ => heartbeat}/ConfigNodeHeartbeatCache.java | 36 +++--
.../{ => heartbeat}/DataNodeHeartbeatCache.java | 11 +-
.../node/{ => heartbeat}/NodeHeartbeatSample.java | 2 +-
.../node/heartbeat}/NodeStatistics.java | 7 +-
.../manager/partition/PartitionManager.java | 55 +------
.../partition/{ => heartbeat}/RegionCache.java | 7 +-
.../{ => heartbeat}/RegionGroupCache.java | 84 +++++++----
.../heartbeat}/RegionGroupStatistics.java | 28 ++--
.../{ => heartbeat}/RegionHeartbeatSample.java | 2 +-
.../partition/heartbeat}/RegionStatistics.java | 7 +-
.../persistence/executor/ConfigPlanExecutor.java | 8 -
.../confignode/persistence/node/NodeInfo.java | 62 +-------
.../persistence/partition/PartitionInfo.java | 116 +--------------
.../persistence/partition/RegionGroup.java | 2 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 6 +-
.../procedure/env/DataNodeRemoveHandler.java | 2 +-
.../OperatePipeProcedureRollbackProcessor.java | 2 +-
.../iotdb/confignode/service/ConfigNode.java | 4 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 7 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 60 --------
.../load/balancer/router/LeaderRouterTest.java | 8 +-
.../balancer/router/LoadScoreGreedyRouterTest.java | 8 +-
.../confignode/manager/node/NodeCacheTest.java | 17 ++-
.../manager/partition/RegionGroupCacheTest.java | 33 ++++-
.../iotdb/confignode/persistence/NodeInfoTest.java | 13 --
.../confignode/persistence/PartitionInfoTest.java | 43 ------
.../persistence/node/NodeStatisticsTest.java | 1 +
.../statistics/RegionGroupStatisticsTest.java | 2 +
.../partition/statistics/RegionStatisticsTest.java | 1 +
integration-test/import-control.xml | 1 +
...thorizeIT.java => IoTDBClusterAuthorityIT.java} | 2 +-
.../it/IoTDBConfigNodeSwitchLeaderIT.java | 69 +--------
43 files changed, 410 insertions(+), 874 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index 62f2ea3676..c97f3d8bb4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
-import org.apache.iotdb.confignode.manager.node.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.thrift.async.AsyncMethodCallback;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 3b24e17189..28e1d26254 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -22,10 +22,10 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.partition.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -60,31 +60,24 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
dataNodeHeartbeatCache.cacheHeartbeatSample(
new NodeHeartbeatSample(heartbeatResp, receiveTime));
- // Update RegionGroupCache
+ // Update RegionGroupCache And leaderCache
heartbeatResp
.getJudgedLeaders()
- .keySet()
.forEach(
- consensusGroupId ->
- regionGroupCacheMap
- .computeIfAbsent(
- consensusGroupId, empty -> new RegionGroupCache(consensusGroupId))
- .cacheHeartbeatSample(
- dataNodeLocation.getDataNodeId(),
- new RegionHeartbeatSample(
- heartbeatResp.getHeartbeatTimestamp(),
- receiveTime,
- // Region will inherit DataNode's status
- RegionStatus.parse(heartbeatResp.getStatus()))));
+ (regionGroupId, isLeader) -> {
+ regionGroupCacheMap
+ .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+ .cacheHeartbeatSample(
+ dataNodeLocation.getDataNodeId(),
+ new RegionHeartbeatSample(
+ heartbeatResp.getHeartbeatTimestamp(),
+ receiveTime,
+ // Region will inherit DataNode's status
+ RegionStatus.parse(heartbeatResp.getStatus())));
- // Update leaderCache
- heartbeatResp
- .getJudgedLeaders()
- .forEach(
- (consensusGroupId, isLeader) -> {
if (isLeader) {
routeBalancer.cacheLeaderSample(
- consensusGroupId,
+ regionGroupId,
new Pair<>(
heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId()));
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 337829fb9b..ff814ee4d2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -64,7 +64,6 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProce
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
@@ -343,9 +342,6 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case GetSeriesSlotList:
plan = new GetSeriesSlotListPlan();
break;
- case UpdateLoadStatistics:
- plan = new UpdateLoadStatisticsPlan();
- break;
case UpdateTriggersOnTransferNodes:
plan = new UpdateTriggersOnTransferNodesPlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 7124e8e764..d549ca450c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -136,10 +136,7 @@ public enum ConfigPhysicalPlanType {
ACTIVE_CQ((short) 1101),
ADD_CQ((short) 1102),
UPDATE_CQ_LAST_EXEC_TIME((short) 1103),
- SHOW_CQ((short) 1104),
-
- // TODO: DELETE IT
- UpdateLoadStatistics((short) 6666);
+ SHOW_CQ((short) 1104);
private final short planType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/statistics/UpdateLoadStatisticsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/statistics/UpdateLoadStatisticsPlan.java
deleted file mode 100644
index a16c60ea39..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/statistics/UpdateLoadStatisticsPlan.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.consensus.request.write.statistics;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
-import org.apache.iotdb.confignode.persistence.node.NodeStatistics;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionGroupStatistics;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TTransport;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class UpdateLoadStatisticsPlan extends ConfigPhysicalPlan {
-
- // Map<NodeId, newNodeStatistics>
- private final Map<Integer, NodeStatistics> nodeStatisticsMap;
-
- // Map<TConsensusGroupId, newRegionGroupStatistics>
- private final Map<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsMap;
-
- private final RegionRouteMap regionRouteMap;
-
- public UpdateLoadStatisticsPlan() {
- super(ConfigPhysicalPlanType.UpdateLoadStatistics);
- this.nodeStatisticsMap = new ConcurrentHashMap<>();
- this.regionGroupStatisticsMap = new ConcurrentHashMap<>();
- this.regionRouteMap = new RegionRouteMap();
- }
-
- /**
- * Check if the current UpdateLoadStatisticsPlan is empty
- *
- * @return True if nodeStatisticsMap, regionGroupStatisticsMap or RegionRouteMap is empty
- */
- public boolean isEmpty() {
- return nodeStatisticsMap.isEmpty()
- && regionGroupStatisticsMap.isEmpty()
- && regionRouteMap.isEmpty();
- }
-
- public void putNodeStatistics(int nodeId, NodeStatistics nodeStatistics) {
- this.nodeStatisticsMap.put(nodeId, nodeStatistics);
- }
-
- public Map<Integer, NodeStatistics> getNodeStatisticsMap() {
- return nodeStatisticsMap;
- }
-
- public void putRegionGroupStatistics(
- TConsensusGroupId consensusGroupId, RegionGroupStatistics regionGroupStatistics) {
- this.regionGroupStatisticsMap.put(consensusGroupId, regionGroupStatistics);
- }
-
- public Map<TConsensusGroupId, RegionGroupStatistics> getRegionGroupStatisticsMap() {
- return regionGroupStatisticsMap;
- }
-
- public RegionRouteMap getRegionRouteMap() {
- return regionRouteMap;
- }
-
- public void setRegionRouteMap(RegionRouteMap regionRouteMap) {
- this.regionRouteMap.setRegionLeaderMap(regionRouteMap.getRegionLeaderMap());
- this.regionRouteMap.setRegionPriorityMap(regionRouteMap.getRegionPriorityMap());
- }
-
- @Override
- protected void serializeImpl(DataOutputStream stream) throws IOException {
- ReadWriteIOUtils.write(getType().getPlanType(), stream);
- try {
- TTransport transport = new TIOStreamTransport(stream);
- TBinaryProtocol protocol = new TBinaryProtocol(transport);
-
- ReadWriteIOUtils.write(nodeStatisticsMap.size(), stream);
- for (Map.Entry<Integer, NodeStatistics> nodeStatisticsEntry : nodeStatisticsMap.entrySet()) {
- ReadWriteIOUtils.write(nodeStatisticsEntry.getKey(), stream);
- nodeStatisticsEntry.getValue().serialize(stream);
- }
-
- ReadWriteIOUtils.write(regionGroupStatisticsMap.size(), stream);
- for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
- regionGroupStatisticsMap.entrySet()) {
- ThriftCommonsSerDeUtils.serializeTConsensusGroupId(
- regionGroupStatisticsEntry.getKey(), stream);
- regionGroupStatisticsEntry.getValue().serialize(stream);
- }
-
- regionRouteMap.serialize(stream, protocol);
- } catch (TException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- int nodeNum = buffer.getInt();
- for (int i = 0; i < nodeNum; i++) {
- int nodeId = buffer.getInt();
- NodeStatistics nodeStatistics = new NodeStatistics();
- nodeStatistics.deserialize(buffer);
- nodeStatisticsMap.put(nodeId, nodeStatistics);
- }
-
- int regionGroupNum = buffer.getInt();
- for (int i = 0; i < regionGroupNum; i++) {
- TConsensusGroupId consensusGroupId =
- ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer);
- RegionGroupStatistics regionGroupStatistics = new RegionGroupStatistics();
- regionGroupStatistics.deserialize(buffer);
- regionGroupStatisticsMap.put(consensusGroupId, regionGroupStatistics);
- }
-
- regionRouteMap.deserialize(buffer);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
- UpdateLoadStatisticsPlan that = (UpdateLoadStatisticsPlan) o;
- return nodeStatisticsMap.equals(that.nodeStatisticsMap)
- && regionGroupStatisticsMap.equals(that.regionGroupStatisticsMap)
- && regionRouteMap.equals(that.regionRouteMap);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- super.hashCode(), nodeStatisticsMap, regionGroupStatisticsMap, regionRouteMap);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 24ece875f2..f6b940d548 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -176,8 +176,8 @@ public class PartitionRegionStateMachine
newLeaderId,
currentNodeTEndPoint);
- // Recover HeartbeatCache by inheriting old-leader's statistics result
- configManager.getLoadManager().recoverHeartbeatCache();
+ // Always initiate all kinds of HeartbeatCache first
+ configManager.getLoadManager().initHeartbeatCache();
// Start leader scheduling services
configManager.getProcedureManager().shiftExecutor(true);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index bda6eea4f0..ed8564617c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
@@ -47,8 +46,12 @@ import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.slf4j.Logger;
@@ -180,52 +183,104 @@ public class LoadManager {
}
private void updateLoadStatistics() {
- UpdateLoadStatisticsPlan updateLoadStatisticsPlan = new UpdateLoadStatisticsPlan();
+ // Broadcast the RegionRouteMap if some LoadStatistics has changed
+ boolean isNeedBroadcast = false;
// Update NodeStatistics
+ Map<Integer, NodeStatistics> differentNodeStatisticsMap = new ConcurrentHashMap<>();
getNodeManager()
.getNodeCacheMap()
.forEach(
(nodeId, nodeCache) -> {
- // Update and check if NodeStatistics needs consensus
- nodeCache.updateNodeStatistics();
- if (!nodeCache
- .getStatistics()
- .equals(getNodeManager().getNodeStatistics(nodeId, false))) {
- updateLoadStatisticsPlan.putNodeStatistics(nodeId, nodeCache.getStatistics());
+ if (nodeCache.periodicUpdate()) {
+ // Update and record the changed NodeStatistics
+ differentNodeStatisticsMap.put(nodeId, nodeCache.getStatistics());
}
});
+ if (!differentNodeStatisticsMap.isEmpty()) {
+ isNeedBroadcast = true;
+ recordNodeStatistics(differentNodeStatisticsMap);
+ }
// Update RegionGroupStatistics
+ Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
+ new ConcurrentHashMap<>();
getPartitionManager()
.getRegionGroupCacheMap()
.forEach(
(regionGroupId, regionGroupCache) -> {
- // Update and check if RegionGroupStatistics needs consensus
- regionGroupCache.updateRegionGroupStatistics();
- if (!regionGroupCache
- .getStatistics()
- .equals(getPartitionManager().getRegionGroupStatistics(regionGroupId, false))) {
- updateLoadStatisticsPlan.putRegionGroupStatistics(
+ if (regionGroupCache.periodicUpdate()) {
+ // Update and record the changed RegionGroupStatistics
+ differentRegionGroupStatisticsMap.put(
regionGroupId, regionGroupCache.getStatistics());
}
});
+ if (!differentRegionGroupStatisticsMap.isEmpty()) {
+ isNeedBroadcast = true;
+ recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
+ }
// Update RegionRouteMap
- routeBalancer.updateRegionRouteMap();
- if (!routeBalancer
- .getLatestRegionRouteMap()
- .equals(getPartitionManager().getRegionRouteMap())) {
- updateLoadStatisticsPlan.setRegionRouteMap(routeBalancer.getLatestRegionRouteMap());
+ if (routeBalancer.updateRegionRouteMap()) {
+ isNeedBroadcast = true;
+ recordRegionRouteMap(routeBalancer.getLatestRegionRouteMap());
}
- // Consensus-write LoadStatistics if not empty
- if (!updateLoadStatisticsPlan.isEmpty()) {
- getConsensusManager().write(updateLoadStatisticsPlan);
+ if (isNeedBroadcast) {
broadcastLatestRegionRouteMap();
}
}
+ private void recordNodeStatistics(Map<Integer, NodeStatistics> differentNodeStatisticsMap) {
+ LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
+ for (Map.Entry<Integer, NodeStatistics> nodeCacheEntry :
+ differentNodeStatisticsMap.entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t {}={}",
+ "nodeId{" + nodeCacheEntry.getKey() + "}",
+ nodeCacheEntry.getValue());
+ }
+ }
+
+ private void recordRegionGroupStatistics(
+ Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
+ LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
+ for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
+ differentRegionGroupStatisticsMap.entrySet()) {
+ LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
+ LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
+ for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
+ regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t dataNodeId{}={}",
+ regionStatisticsEntry.getKey(),
+ regionStatisticsEntry.getValue());
+ }
+ }
+ }
+
+ private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
+ LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
+ for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
+ regionRouteMap.getRegionLeaderMap().entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t {}={}",
+ regionLeaderEntry.getKey(),
+ regionLeaderEntry.getValue());
+ }
+
+ LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
+ for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
+ regionRouteMap.getRegionPriorityMap().entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t {}={}",
+ regionPriorityEntry.getKey(),
+ regionPriorityEntry.getValue().getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()));
+ }
+ }
+
public void broadcastLatestRegionRouteMap() {
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = getLatestRegionRouteMap();
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
@@ -236,9 +291,8 @@ public class LoadManager {
dataNodeLocationMap.put(
onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
- LOGGER.info("[latestRegionRouteMap] Begin to broadcast RegionRouteMap:");
+ LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
long broadcastTime = System.currentTimeMillis();
- printRegionRouteMap(broadcastTime, latestRegionRouteMap);
AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
new AsyncClientHandler<>(
@@ -246,31 +300,14 @@ public class LoadManager {
new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
- LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap finished.");
+ LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
}
- public static void printRegionRouteMap(
- long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> regionRouteMap) {
- LOGGER.info("[latestRegionRouteMap] timestamp:{}", timestamp);
- LOGGER.info("[latestRegionRouteMap] RegionRouteMap:");
- for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> entry : regionRouteMap.entrySet()) {
- LOGGER.info(
- "[latestRegionRouteMap]\t {}={}",
- entry.getKey(),
- entry.getValue().getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toList()));
- }
- }
-
- /**
- * Recover the cluster heartbeat cache through loadStatistics when the ConfigNode-Leader is
- * switched
- */
- public void recoverHeartbeatCache() {
- getNodeManager().recoverNodeCacheMap();
- getPartitionManager().recoverRegionGroupCacheMap();
- routeBalancer.recoverRegionRouteMap();
+ /** Initialize all kinds of the HeartbeatCache when the ConfigNode-Leader is switched */
+ public void initHeartbeatCache() {
+ getNodeManager().initNodeHeartbeatCache();
+ getPartitionManager().initRegionGroupHeartbeatCache();
+ routeBalancer.initRegionRouteMap();
}
public RouteBalancer getRouteBalancer() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 1f85d755c2..6ee52b0415 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.load.balancer;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -44,7 +45,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
/**
* The RouteBalancer will maintain cluster RegionRouteMap, which contains:
@@ -87,7 +90,6 @@ public class RouteBalancer {
this.configManager = configManager;
this.leaderCache = new ConcurrentHashMap<>();
-
this.regionRouteMap = new RegionRouteMap();
switch (ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy()) {
case GREEDY_POLICY:
@@ -120,30 +122,36 @@ public class RouteBalancer {
}
}
- /** Invoking periodically to update the latest RegionRouteMap */
- public void updateRegionRouteMap() {
+ /**
+ * Invoking periodically to update the RegionRouteMap
+ *
+ * @return True if the RegionRouteMap has changed, false otherwise
+ */
+ public boolean updateRegionRouteMap() {
synchronized (regionRouteMap) {
- updateRegionLeaderMap();
- updateRegionPriorityMap();
+ return updateRegionLeaderMap() | updateRegionPriorityMap();
}
}
- private void updateRegionLeaderMap() {
+ private boolean updateRegionLeaderMap() {
+ AtomicBoolean isLeaderChanged = new AtomicBoolean(false);
leaderCache.forEach(
(regionGroupId, leadershipSample) -> {
if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) && isMultiLeader) {
- // Ignore update leader when using multi-leader consensus protocol
+ // Ignore MultiLeader consensus protocol
return;
}
if (leadershipSample.getRight() != regionRouteMap.getLeader(regionGroupId)) {
// Update leader
regionRouteMap.setLeader(regionGroupId, leadershipSample.getRight());
+ isLeaderChanged.set(true);
}
});
+ return isLeaderChanged.get();
}
- private void updateRegionPriorityMap() {
+ private boolean updateRegionPriorityMap() {
Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap();
Map<Integer, Long> dataNodeLoadScoreMap = getNodeManager().getAllLoadScores();
@@ -162,6 +170,9 @@ public class RouteBalancer {
if (!latestRegionPriorityMap.equals(regionRouteMap.getRegionPriorityMap())) {
regionRouteMap.setRegionPriorityMap(latestRegionPriorityMap);
+ return true;
+ } else {
+ return false;
}
}
@@ -223,6 +234,7 @@ public class RouteBalancer {
currentLeaderBalancingFuture.cancel(false);
currentLeaderBalancingFuture = null;
leaderCache.clear();
+ regionRouteMap.clear();
LOGGER.info("Route-Balancing service is stopped successfully.");
}
}
@@ -232,16 +244,23 @@ public class RouteBalancer {
// TODO: IOTDB-4768
}
- /** Recover the regionRouteMap when the ConfigNode-Leader is switched */
- public void recoverRegionRouteMap() {
+ /** Initialize the regionRouteMap when the ConfigNode-Leader is switched */
+ public void initRegionRouteMap() {
synchronized (regionRouteMap) {
- RegionRouteMap inheritRegionRouteMap = getPartitionManager().getRegionRouteMap();
- regionRouteMap.setRegionLeaderMap(
- new ConcurrentHashMap<>(inheritRegionRouteMap.getRegionLeaderMap()));
- regionRouteMap.setRegionPriorityMap(
- new ConcurrentHashMap<>(inheritRegionRouteMap.getRegionPriorityMap()));
-
- LOGGER.info("[InheritLoadStatistics] RegionRouteMap: {}", regionRouteMap);
+ regionRouteMap.clear();
+ if (isMultiLeader) {
+ // Greedily pick leader for all existed DataRegionGroups
+ List<TRegionReplicaSet> dataRegionGroups =
+ getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion);
+ for (TRegionReplicaSet dataRegionGroup : dataRegionGroups) {
+ greedySelectLeader(
+ dataRegionGroup.getRegionId(),
+ dataRegionGroup.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()));
+ }
+ }
+ updateRegionRouteMap();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
index 906fd610ca..f2d1971d30 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
@@ -50,6 +50,10 @@ public class RegionRouteMap {
this.regionPriorityMap = new ConcurrentHashMap<>();
}
+ /**
+ * @return DataNodeId where the specified RegionGroup's leader resides. And return -1 if the
+ * leader is not recorded yet
+ */
public int getLeader(TConsensusGroupId regionGroupId) {
return regionLeaderMap.getOrDefault(regionGroupId, -1);
}
@@ -159,14 +163,4 @@ public class RegionRouteMap {
public int hashCode() {
return Objects.hash(regionLeaderMap, regionPriorityMap);
}
-
- @Override
- public String toString() {
- return "RegionRouteMap{"
- + "regionLeaderMap="
- + regionLeaderMap
- + ", regionPriorityMap="
- + regionPriorityMap
- + '}';
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 9ae8e82276..53b1d61794 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -57,10 +56,12 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.metric.NodeInfoMetrics;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
-import org.apache.iotdb.confignode.persistence.node.NodeStatistics;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
@@ -107,9 +108,6 @@ public class NodeManager {
private static final long UNKNOWN_DATANODE_DETECT_INTERVAL =
CONF.getUnknownDataNodeDetectInterval();
- public static final TEndPoint CURRENT_NODE =
- new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
-
// when fail to register a new node, set node id to -1
private static final int ERROR_STATUS_NODE_ID = -1;
@@ -498,7 +496,7 @@ public class NodeManager {
info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
info.setRoleType(
- configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)
+ configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID
? RegionRoleType.Leader.name()
: RegionRoleType.Follower.name());
configNodeInfoList.add(info);
@@ -717,7 +715,7 @@ public class NodeManager {
THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
// Send heartbeat requests
for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
- if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
+ if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
// Skip itself
continue;
}
@@ -727,7 +725,7 @@ public class NodeManager {
(ConfigNodeHeartbeatCache)
nodeCacheMap.computeIfAbsent(
configNodeLocation.getConfigNodeId(),
- empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
+ empty -> new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())));
AsyncConfigNodeHeartbeatClientPool.getInstance()
.getConfigNodeHeartBeat(
configNodeLocation.getInternalEndPoint(),
@@ -933,79 +931,34 @@ public class NodeManager {
return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
}
- /** Recover the nodeCacheMap when the ConfigNode-Leader is switched */
- public void recoverNodeCacheMap() {
- Map<Integer, NodeStatistics> nodeStatisticsMap = nodeInfo.getNodeStatisticsMap();
+ /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
+ public void initNodeHeartbeatCache() {
+ final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
nodeCacheMap.clear();
- LOGGER.info("[InheritLoadStatistics] Start to inherit NodeStatistics...");
- // Force update ConfigNode-leader
- nodeCacheMap.put(
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- new ConfigNodeHeartbeatCache(
- new TConfigNodeLocation(
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- CURRENT_NODE,
- new TEndPoint(
- ConfigNodeDescriptor.getInstance().getConf().getInternalAddress(),
- ConfigNodeDescriptor.getInstance().getConf().getConsensusPort())),
- ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
-
- // Inherit ConfigNodeStatistics
+ // Init ConfigNodeHeartbeatCache
getRegisteredConfigNodes()
.forEach(
configNodeLocation -> {
- int configNodeId = configNodeLocation.getConfigNodeId();
- if (!configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)
- && nodeStatisticsMap.containsKey(configNodeId)) {
- nodeCacheMap.put(configNodeId, new ConfigNodeHeartbeatCache(configNodeLocation));
- nodeCacheMap
- .get(configNodeId)
- .forceUpdate(
- nodeStatisticsMap.get(configNodeId).convertToNodeHeartbeatSample());
- LOGGER.info(
- "[InheritLoadStatistics]\t {}={}",
- "nodeId{" + configNodeId + "}",
- nodeCacheMap.get(configNodeId).getStatistics());
+ if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) {
+ nodeCacheMap.put(
+ configNodeLocation.getConfigNodeId(),
+ new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()));
}
});
+ // Force set itself and never update
+ nodeCacheMap.put(
+ ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
+ new ConfigNodeHeartbeatCache(
+ CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
- // Inherit DataNodeStatistics
+ // Init DataNodeHeartbeatCache
getRegisteredDataNodes()
.forEach(
- dataNodeConfiguration -> {
- int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
- if (nodeStatisticsMap.containsKey(dataNodeId)) {
- nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache());
- nodeCacheMap
- .get(dataNodeId)
- .forceUpdate(nodeStatisticsMap.get(dataNodeId).convertToNodeHeartbeatSample());
- LOGGER.info(
- "[InheritLoadStatistics]\t {}={}",
- "nodeId{" + dataNodeId + "}",
- nodeCacheMap.get(dataNodeId).getStatistics());
- }
- });
-
- LOGGER.info("[InheritLoadStatistics] Inherit NodeStatistics finish");
- }
-
- /**
- * @param nodeId The specified Node's index
- * @param isLatest Is the NodeStatistics latest
- * @return NodeStatistics in NodeCache if the isLatest is set to True, NodeStatistics in NodeInfo
- * otherwise
- */
- public NodeStatistics getNodeStatistics(int nodeId, boolean isLatest) {
- if (isLatest) {
- return nodeCacheMap.containsKey(nodeId)
- ? nodeCacheMap.get(nodeId).getStatistics()
- : NodeStatistics.generateDefaultNodeStatistics();
- } else {
- return nodeInfo.getNodeStatisticsMap().containsKey(nodeId)
- ? nodeInfo.getNodeStatisticsMap().get(nodeId)
- : NodeStatistics.generateDefaultNodeStatistics();
- }
+ dataNodeConfiguration ->
+ nodeCacheMap.put(
+ dataNodeConfiguration.getLocation().getDataNodeId(),
+ new DataNodeHeartbeatCache()));
}
private ConsensusManager getConsensusManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
similarity index 54%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/BaseNodeCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
index 8675dd1aab..3559c47898 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
@@ -16,36 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node;
+package org.apache.iotdb.confignode.manager.node.heartbeat;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.persistence.node.NodeStatistics;
import java.util.LinkedList;
/** All the statistic interfaces that provided by HeartbeatCache */
public abstract class BaseNodeCache {
- /** When the response time of heartbeat is more than 20s, the node is considered as down */
+ // When the response time of heartbeat is more than 20s, the Node is considered as down
public static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
- /** Max heartbeat cache samples store size */
+ // Max heartbeat cache samples store size
public static final int MAXIMUM_WINDOW_SIZE = 100;
- /** SlidingWindow stores the heartbeat sample data */
+ // SlidingWindow stores the heartbeat sample data
protected final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
- protected volatile NodeStatistics statistics;
+ // The previous NodeStatistics, used for comparing with
+ // the current NodeStatistics to initiate notification when they are different
+ protected volatile NodeStatistics previousStatistics;
+ // The current NodeStatistics, used for providing statistics to other services
+ protected volatile NodeStatistics currentStatistics;
/** Constructor for NodeCache with default NodeStatistics */
protected BaseNodeCache() {
- this.statistics = NodeStatistics.generateDefaultNodeStatistics();
+ this.previousStatistics = NodeStatistics.generateDefaultNodeStatistics();
+ this.currentStatistics = NodeStatistics.generateDefaultNodeStatistics();
}
/**
- * Cache the newest HeartbeatSample
+ * Cache the newest NodeHeartbeatSample
*
- * @param newHeartbeatSample The newest HeartbeatSample
+ * @param newHeartbeatSample The newest NodeHeartbeatSample
*/
public void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample) {
synchronized (slidingWindow) {
@@ -62,8 +66,23 @@ public abstract class BaseNodeCache {
}
}
- /** Invoking periodically to update Node's latest NodeStatistics */
- public abstract void updateNodeStatistics();
+ /**
+ * Invoking periodically in the Cluster-LoadStatistics-Service to update currentStatistics and
+ * compare with the previousStatistics, in order to detect whether the Node's statistics has
+ * changed
+ *
+ * @return True if the currentStatistics has changed recently(compare with the
+ * previousStatistics), false otherwise
+ */
+ public boolean periodicUpdate() {
+ updateCurrentStatistics();
+ if (!currentStatistics.equals(previousStatistics)) {
+ previousStatistics = currentStatistics.deepCopy();
+ return true;
+ } else {
+ return false;
+ }
+ }
/**
* Actively append a custom NodeHeartbeatSample to force a change in the NodeStatistics.
@@ -71,38 +90,51 @@ public abstract class BaseNodeCache {
* <p>For example, this interface can be invoked in Node removing process to forcibly change the
* corresponding Node's status to Removing without waiting for heartbeat sampling
*
+ * <p>Notice: The ConfigNode-leader doesn't know the specified Node's statistics has changed even
+ * if this interface is invoked, since the ConfigNode-leader only detect cluster Nodes' statistics
+ * by periodicUpdate interface. However, other service can still read the update of
+ * currentStatistics by invoking getters below.
+ *
* @param newHeartbeatSample A custom NodeHeartbeatSample that will lead to needed NodeStatistics
*/
public void forceUpdate(NodeHeartbeatSample newHeartbeatSample) {
cacheHeartbeatSample(newHeartbeatSample);
- updateNodeStatistics();
+ updateCurrentStatistics();
}
+ /**
+ * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow
+ */
+ protected abstract void updateCurrentStatistics();
+
/**
* TODO: The loadScore of each Node will be changed to Double
*
* @return The latest load score of a node, the higher the score the higher the load
*/
public long getLoadScore() {
- return statistics.getLoadScore();
+ return currentStatistics.getLoadScore();
}
/** @return The current status of the Node */
public NodeStatus getNodeStatus() {
// Return a copy of status
- return NodeStatus.parse(statistics.getStatus().getStatus());
+ return NodeStatus.parse(currentStatistics.getStatus().getStatus());
}
/** @return The reason why lead to current NodeStatus */
public String getNodeStatusWithReason() {
- if (statistics.getStatusReason() == null) {
- return statistics.getStatus().getStatus();
+ if (currentStatistics.getStatusReason() == null) {
+ return currentStatistics.getStatus().getStatus();
} else {
- return statistics.getStatus().getStatus() + "(" + statistics.getStatusReason() + ")";
+ return currentStatistics.getStatus().getStatus()
+ + "("
+ + currentStatistics.getStatusReason()
+ + ")";
}
}
public NodeStatistics getStatistics() {
- return statistics;
+ return currentStatistics;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
similarity index 67%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ConfigNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
index 772adcc48b..cf86e92892 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
@@ -16,36 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node;
+package org.apache.iotdb.confignode.manager.node.heartbeat;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.persistence.node.NodeStatistics;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
public class ConfigNodeHeartbeatCache extends BaseNodeCache {
+ /** Only get CURRENT_NODE_ID here due to initialization order */
+ public static final int CURRENT_NODE_ID =
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+
public static final NodeStatistics CURRENT_NODE_STATISTICS =
new NodeStatistics(0, NodeStatus.Running, null);
- private final TConfigNodeLocation configNodeLocation;
+ private final int configNodeId;
/** Constructor for create ConfigNodeHeartbeatCache with default NodeStatistics */
- public ConfigNodeHeartbeatCache(TConfigNodeLocation configNodeLocation) {
+ public ConfigNodeHeartbeatCache(int configNodeId) {
super();
- this.configNodeLocation = configNodeLocation;
+ this.configNodeId = configNodeId;
}
- /** Constructor that only used when ConfigNode-leader switched */
- public ConfigNodeHeartbeatCache(
- TConfigNodeLocation configNodeLocation, NodeStatistics nodeStatistics) {
- this.configNodeLocation = configNodeLocation;
- this.statistics = nodeStatistics;
+ /** Constructor only for ConfigNode-leader */
+ public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) {
+ super();
+ this.configNodeId = configNodeId;
+ this.previousStatistics = statistics;
+ this.currentStatistics = statistics;
}
@Override
- public void updateNodeStatistics() {
+ protected void updateCurrentStatistics() {
// Skip itself
- if (configNodeLocation.getInternalEndPoint().equals(NodeManager.CURRENT_NODE)) {
+ if (configNodeId == CURRENT_NODE_ID) {
return;
}
@@ -71,9 +75,9 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
NodeStatistics newStatistics = new NodeStatistics(loadScore, status, null);
- if (!statistics.equals(newStatistics)) {
- // Update NodeStatistics if necessary
- statistics = newStatistics;
+ if (!currentStatistics.equals(newStatistics)) {
+ // Update the current NodeStatistics if necessary
+ currentStatistics = newStatistics;
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/DataNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
index 0a357f0b81..f9d5df7008 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node;
+package org.apache.iotdb.confignode.manager.node.heartbeat;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.persistence.node.NodeStatistics;
/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
public class DataNodeHeartbeatCache extends BaseNodeCache {
@@ -30,7 +29,7 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
}
@Override
- public void updateNodeStatistics() {
+ protected void updateCurrentStatistics() {
NodeHeartbeatSample lastSample = null;
synchronized (slidingWindow) {
if (slidingWindow.size() > 0) {
@@ -56,9 +55,9 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
NodeStatistics newStatistics = new NodeStatistics(loadScore, status, statusReason);
- if (!statistics.equals(newStatistics)) {
- // Update NodeStatistics if necessary
- statistics = newStatistics;
+ if (!currentStatistics.equals(newStatistics)) {
+ // Update the current NodeStatistics if necessary
+ currentStatistics = newStatistics;
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
index 7723aac40f..531a88dcc3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node;
+package org.apache.iotdb.confignode.manager.node.heartbeat;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
similarity index 96%
rename from confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
index eda6610a4f..627ac00a33 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.persistence.node;
+package org.apache.iotdb.confignode.manager.node.heartbeat;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -99,6 +98,10 @@ public class NodeStatistics {
return new NodeStatistics(Long.MAX_VALUE, NodeStatus.Unknown, null);
}
+ public NodeStatistics deepCopy() {
+ return new NodeStatistics(loadScore, status, statusReason);
+ }
+
public NodeHeartbeatSample convertToNodeHeartbeatSample() {
long currentTime = System.currentTimeMillis();
return new NodeHeartbeatSample(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index a7f474c410..7722a668b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -67,13 +67,12 @@ import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionGroupStatistics;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
@@ -795,55 +794,15 @@ public class PartitionManager {
: RegionGroupStatus.Disabled;
}
- /** Recover the regionGroupCacheMap when the ConfigNode-Leader is switched */
- public void recoverRegionGroupCacheMap() {
- Map<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsMap =
- partitionInfo.getRegionGroupStatisticsMap();
+ /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched */
+ public void initRegionGroupHeartbeatCache() {
regionGroupCacheMap.clear();
-
- LOGGER.info("[InheritLoadStatistics] Start to inherit RegionGroupStatistics...");
-
getAllReplicaSets()
.forEach(
- regionReplicaSet -> {
- TConsensusGroupId groupId = regionReplicaSet.getRegionId();
- if (regionGroupStatisticsMap.containsKey(groupId)) {
- regionGroupCacheMap.put(groupId, new RegionGroupCache(groupId));
- regionGroupCacheMap
- .get(groupId)
- .forceUpdate(
- regionGroupStatisticsMap.get(groupId).convertToRegionHeartbeatSampleMap());
- LOGGER.info(
- "[InheritLoadStatistics]\t {}={}",
- groupId,
- regionGroupCacheMap.get(groupId).getStatistics());
- }
- });
-
- LOGGER.info("[InheritLoadStatistics] Inherit RegionGroupStatistics finish");
- }
-
- /**
- * @param regionGroupId The specified RegionGroup's index
- * @param isLatest Is the RegionGroupStatistics latest
- * @return RegionGroupStatistics in RegionGroupCache if the isLatest is set to True,
- * RegionGroupStatistics in PartitionInfo otherwise
- */
- public RegionGroupStatistics getRegionGroupStatistics(
- TConsensusGroupId regionGroupId, boolean isLatest) {
- if (isLatest) {
- return regionGroupCacheMap.containsKey(regionGroupId)
- ? regionGroupCacheMap.get(regionGroupId).getStatistics()
- : RegionGroupStatistics.generateDefaultRegionGroupStatistics();
- } else {
- return partitionInfo.getRegionGroupStatisticsMap().containsKey(regionGroupId)
- ? partitionInfo.getRegionGroupStatisticsMap().get(regionGroupId)
- : RegionGroupStatistics.generateDefaultRegionGroupStatistics();
- }
- }
-
- public RegionRouteMap getRegionRouteMap() {
- return partitionInfo.getRegionRouteMap();
+ regionReplicaSet ->
+ regionGroupCacheMap.put(
+ regionReplicaSet.getRegionId(),
+ new RegionGroupCache(regionReplicaSet.getRegionId())));
}
public ScheduledExecutorService getRegionMaintainer() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
index e4e3a2a3b2..10f33e6300 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
@@ -16,17 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition;
+package org.apache.iotdb.confignode.manager.partition.heartbeat;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionStatistics;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import static org.apache.iotdb.confignode.manager.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
-import static org.apache.iotdb.confignode.manager.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
+import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
public class RegionCache {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
similarity index 66%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
index 56648d4979..acd3da603f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition;
+package org.apache.iotdb.confignode.manager.partition.heartbeat;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionGroupStatistics;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionStatistics;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import java.util.HashMap;
import java.util.Map;
@@ -34,16 +33,27 @@ public class RegionGroupCache {
// Map<DataNodeId(where a RegionReplica resides), RegionCache>
private final Map<Integer, RegionCache> regionCacheMap;
- private volatile RegionGroupStatistics statistics;
+ // The previous RegionGroupStatistics, used for comparing with
+ // the current RegionGroupStatistics to initiate notification when they are different
+ protected volatile RegionGroupStatistics previousStatistics;
+ // The current RegionGroupStatistics, used for providing statistics to other services
+ private volatile RegionGroupStatistics currentStatistics;
/** Constructor for create RegionGroupCache with default RegionGroupStatistics */
public RegionGroupCache(TConsensusGroupId consensusGroupId) {
this.consensusGroupId = consensusGroupId;
this.regionCacheMap = new ConcurrentHashMap<>();
- this.statistics = RegionGroupStatistics.generateDefaultRegionGroupStatistics();
+ this.previousStatistics = RegionGroupStatistics.generateDefaultRegionGroupStatistics();
+ this.currentStatistics = RegionGroupStatistics.generateDefaultRegionGroupStatistics();
}
+ /**
+ * Cache the newest RegionHeartbeatSample
+ *
+ * @param dataNodeId Where the specified Region resides
+ * @param newHeartbeatSample The newest RegionHeartbeatSample
+ */
public void cacheHeartbeatSample(int dataNodeId, RegionHeartbeatSample newHeartbeatSample) {
regionCacheMap
.computeIfAbsent(dataNodeId, empty -> new RegionCache())
@@ -51,13 +61,47 @@ public class RegionGroupCache {
}
/**
- * Update RegionReplicas' statistics, including:
+ * Invoking periodically in the Cluster-LoadStatistics-Service to update currentStatistics and
+ * compare with the previousStatistics, in order to detect whether the RegionGroup's statistics
+ * has changed
*
- * <p>1. RegionGroupStatus
+ * @return True if the currentStatistics has changed recently(compare with the
+ * previousStatistics), false otherwise
+ */
+ public boolean periodicUpdate() {
+ updateCurrentStatistics();
+ if (!currentStatistics.equals(previousStatistics)) {
+ previousStatistics = currentStatistics.deepCopy();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Actively append custom NodeHeartbeatSamples to force a change in the RegionGroupStatistics.
*
- * <p>2. RegionStatus
+ * <p>For example, this interface can be invoked in RegionGroup creating process to forcibly
+ * activate the corresponding RegionGroup's status to Available without waiting for heartbeat
+ * sampling
+ *
+ * <p>Notice: The ConfigNode-leader doesn't know the specified RegionGroup's statistics has
+ * changed even if this interface is invoked, since the ConfigNode-leader only detect cluster
+ * RegionGroups' statistics by periodicUpdate interface. However, other service can still read the
+ * update of currentStatistics by invoking getters below.
+ *
+ * @param newHeartbeatSamples Custom RegionHeartbeatSamples that will lead to needed
+ * RegionGroupStatistics
*/
- public void updateRegionGroupStatistics() {
+ public void forceUpdate(Map<Integer, RegionHeartbeatSample> newHeartbeatSamples) {
+ newHeartbeatSamples.forEach(this::cacheHeartbeatSample);
+ updateCurrentStatistics();
+ }
+
+ /**
+ * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow
+ */
+ protected void updateCurrentStatistics() {
Map<Integer, RegionStatistics> regionStatisticsMap = new HashMap<>();
for (Map.Entry<Integer, RegionCache> cacheEntry : regionCacheMap.entrySet()) {
// Update RegionStatistics
@@ -70,9 +114,9 @@ public class RegionGroupCache {
RegionGroupStatistics newRegionGroupStatistics =
new RegionGroupStatistics(status, regionStatisticsMap);
- if (!statistics.equals(newRegionGroupStatistics)) {
+ if (!currentStatistics.equals(newRegionGroupStatistics)) {
// Update RegionGroupStatistics if necessary
- statistics = newRegionGroupStatistics;
+ currentStatistics = newRegionGroupStatistics;
}
}
@@ -103,27 +147,11 @@ public class RegionGroupCache {
}
}
- /**
- * Actively append custom NodeHeartbeatSamples to force a change in the RegionGroupStatistics.
- *
- * <p>For example, this interface can be invoked in RegionGroup creating process to forcibly
- * activate the corresponding RegionGroup's status to Available without waiting for heartbeat
- * sampling
- *
- * @param newHeartbeatSamples Custom RegionHeartbeatSamples that will lead to needed
- * RegionGroupStatistics
- */
- public void forceUpdate(Map<Integer, RegionHeartbeatSample> newHeartbeatSamples) {
- newHeartbeatSamples.forEach(this::cacheHeartbeatSample);
- updateRegionGroupStatistics();
- }
-
public void removeCacheIfExists(int dataNodeId) {
regionCacheMap.remove(dataNodeId);
}
- /** @return The latest RegionGroupStatistics of the current RegionGroup */
public RegionGroupStatistics getStatistics() {
- return statistics;
+ return currentStatistics;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
similarity index 85%
rename from confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
index ef018f5d3e..d36175bc67 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
@@ -16,18 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.persistence.partition.statistics;
+package org.apache.iotdb.confignode.manager.partition.heartbeat;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
-import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,7 +37,7 @@ public class RegionGroupStatistics {
private final Map<Integer, RegionStatistics> regionStatisticsMap;
public RegionGroupStatistics() {
- this.regionStatisticsMap = new HashMap<>();
+ this.regionStatisticsMap = new ConcurrentHashMap<>();
}
public RegionGroupStatistics(
@@ -64,16 +62,19 @@ public class RegionGroupStatistics {
: RegionStatus.Unknown;
}
+ public Map<Integer, RegionStatistics> getRegionStatisticsMap() {
+ return regionStatisticsMap;
+ }
+
public static RegionGroupStatistics generateDefaultRegionGroupStatistics() {
- return new RegionGroupStatistics(RegionGroupStatus.Disabled, new HashMap<>());
+ return new RegionGroupStatistics(RegionGroupStatus.Disabled, new ConcurrentHashMap<>());
}
- public Map<Integer, RegionHeartbeatSample> convertToRegionHeartbeatSampleMap() {
- Map<Integer, RegionHeartbeatSample> result = new ConcurrentHashMap<>();
+ public RegionGroupStatistics deepCopy() {
+ Map<Integer, RegionStatistics> deepCopyMap = new ConcurrentHashMap<>();
regionStatisticsMap.forEach(
- (dataNodeId, regionStatistics) ->
- result.put(dataNodeId, regionStatistics.convertToRegionHeartbeatSample()));
- return result;
+ (dataNodeId, regionStatistics) -> deepCopyMap.put(dataNodeId, regionStatistics.deepCopy()));
+ return new RegionGroupStatistics(regionGroupStatus, deepCopyMap);
}
public void serialize(OutputStream stream) throws IOException {
@@ -129,11 +130,6 @@ public class RegionGroupStatistics {
@Override
public String toString() {
- return "RegionGroupStatistics{"
- + "regionGroupStatus="
- + regionGroupStatus
- + ", regionStatisticsMap="
- + regionStatisticsMap
- + '}';
+ return "RegionGroupStatistics{" + "regionGroupStatus=" + regionGroupStatus + '}';
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
index 2ce3b21d3e..8de58a5e93 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition;
+package org.apache.iotdb.confignode.manager.partition.heartbeat;
import org.apache.iotdb.commons.cluster.RegionStatus;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
index 2d77498df1..d30bf43d96 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.persistence.partition.statistics;
+package org.apache.iotdb.confignode.manager.partition.heartbeat;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
@@ -44,6 +43,10 @@ public class RegionStatistics {
return regionStatus;
}
+ public RegionStatistics deepCopy() {
+ return new RegionStatistics(regionStatus);
+ }
+
public RegionHeartbeatSample convertToRegionHeartbeatSample() {
long currentTime = System.currentTimeMillis();
return new RegionHeartbeatSample(currentTime, currentTime, regionStatus);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index f5fb450ee1..b08e4059ef 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -62,7 +62,6 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProce
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
@@ -366,13 +365,6 @@ public class ConfigPlanExecutor {
return cqInfo.activeCQ((ActiveCQPlan) physicalPlan);
case UPDATE_CQ_LAST_EXEC_TIME:
return cqInfo.updateCQLastExecutionTime((UpdateCQLastExecTimePlan) physicalPlan);
- case UpdateLoadStatistics:
- LOGGER.info(
- "[UpdateLoadStatistics] Update cluster load statistics, timestamp: {}",
- System.currentTimeMillis());
- nodeInfo.updateNodeStatistics((UpdateLoadStatisticsPlan) physicalPlan);
- return partitionInfo.updateRegionGroupStatisticsAndRegionRouteMap(
- (UpdateLoadStatisticsPlan) physicalPlan);
default:
throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 1ccc80ba58..d03fffa7f9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -85,9 +84,6 @@ public class NodeInfo implements SnapshotProcessor {
private final AtomicInteger nextNodeId = new AtomicInteger(-1);
private final Map<Integer, TDataNodeConfiguration> registeredDataNodes;
- // Node Statistics
- private final Map<Integer, NodeStatistics> nodeStatisticsMap;
-
private final String snapshotFileName = "node_info.bin";
public NodeInfo() {
@@ -96,8 +92,6 @@ public class NodeInfo implements SnapshotProcessor {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.registeredDataNodes = new ConcurrentHashMap<>();
-
- this.nodeStatisticsMap = new ConcurrentHashMap<>();
}
/**
@@ -373,33 +367,6 @@ public class NodeInfo implements SnapshotProcessor {
return nextNodeId.incrementAndGet();
}
- /**
- * Update NodeStatistics through consensus-write
- *
- * @param updateLoadStatisticsPlan UpdateLoadStatisticsPlan
- */
- public void updateNodeStatistics(UpdateLoadStatisticsPlan updateLoadStatisticsPlan) {
- if (!updateLoadStatisticsPlan.getNodeStatisticsMap().isEmpty()) {
- synchronized (nodeStatisticsMap) {
- // Update nodeStatisticsMap
- nodeStatisticsMap.putAll(updateLoadStatisticsPlan.getNodeStatisticsMap());
- // Log current NodeStatistics
- LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
- for (Map.Entry<Integer, NodeStatistics> nodeCacheEntry : nodeStatisticsMap.entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t {}={}",
- "nodeId{" + nodeCacheEntry.getKey() + "}",
- nodeCacheEntry.getValue());
- }
- }
- }
- }
-
- /** Only used when the ConfigNode-Leader is switched */
- public Map<Integer, NodeStatistics> getNodeStatisticsMap() {
- return nodeStatisticsMap;
- }
-
@Override
public boolean processTakeSnapshot(File snapshotDir) throws IOException, TException {
File snapshotFile = new File(snapshotDir, snapshotFileName);
@@ -424,8 +391,6 @@ public class NodeInfo implements SnapshotProcessor {
serializeRegisteredDataNode(fileOutputStream, protocol);
- serializeNodeStatistics(fileOutputStream);
-
fileOutputStream.flush();
fileOutputStream.close();
@@ -464,14 +429,6 @@ public class NodeInfo implements SnapshotProcessor {
}
}
- private void serializeNodeStatistics(OutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(nodeStatisticsMap.size(), outputStream);
- for (Map.Entry<Integer, NodeStatistics> nodeStatisticsEntry : nodeStatisticsMap.entrySet()) {
- ReadWriteIOUtils.write(nodeStatisticsEntry.getKey(), outputStream);
- nodeStatisticsEntry.getValue().serialize(outputStream);
- }
- }
-
@Override
public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
@@ -498,8 +455,6 @@ public class NodeInfo implements SnapshotProcessor {
deserializeRegisteredDataNode(fileInputStream, protocol);
- deserializeNodeStatistics(fileInputStream);
-
} finally {
configNodeInfoReadWriteLock.writeLock().unlock();
dataNodeInfoReadWriteLock.writeLock().unlock();
@@ -530,17 +485,6 @@ public class NodeInfo implements SnapshotProcessor {
}
}
- private void deserializeNodeStatistics(InputStream inputStream) throws IOException {
- int size = ReadWriteIOUtils.readInt(inputStream);
- while (size > 0) {
- int nodeId = ReadWriteIOUtils.readInt(inputStream);
- NodeStatistics nodeStatistics = new NodeStatistics();
- nodeStatistics.deserialize(inputStream);
- nodeStatisticsMap.put(nodeId, nodeStatistics);
- size--;
- }
- }
-
public static int getMinimumDataNode() {
return minimumDataNode;
}
@@ -549,7 +493,6 @@ public class NodeInfo implements SnapshotProcessor {
nextNodeId.set(-1);
registeredDataNodes.clear();
registeredConfigNodes.clear();
- nodeStatisticsMap.clear();
}
@Override
@@ -559,12 +502,11 @@ public class NodeInfo implements SnapshotProcessor {
NodeInfo nodeInfo = (NodeInfo) o;
return registeredConfigNodes.equals(nodeInfo.registeredConfigNodes)
&& nextNodeId.get() == nodeInfo.nextNodeId.get()
- && registeredDataNodes.equals(nodeInfo.registeredDataNodes)
- && nodeStatisticsMap.equals(nodeInfo.nodeStatisticsMap);
+ && registeredDataNodes.equals(nodeInfo.registeredDataNodes);
}
@Override
public int hashCode() {
- return Objects.hash(registeredConfigNodes, nextNodeId, registeredDataNodes, nodeStatisticsMap);
+ return Objects.hash(registeredConfigNodes, nextNodeId, registeredDataNodes);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index f620f4fcf8..812393b0a2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchem
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
@@ -55,10 +54,8 @@ import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionGroupStatistics;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.consensus.common.DataSet;
@@ -113,12 +110,6 @@ public class PartitionInfo implements SnapshotProcessor {
// For RegionReplicas' asynchronous management
private final List<RegionMaintainTask> regionMaintainTaskList;
- /** For Load-Balancing */
- // Map<RegionGroupId, RegionGroupStatistics>
- private final Map<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsMap;
-
- private final RegionRouteMap regionRouteMap;
-
private final String snapshotFileName = "partition_info.bin";
public PartitionInfo() {
@@ -126,9 +117,6 @@ public class PartitionInfo implements SnapshotProcessor {
this.storageGroupPartitionTables = new ConcurrentHashMap<>();
this.regionMaintainTaskList = Collections.synchronizedList(new ArrayList<>());
-
- this.regionGroupStatisticsMap = new ConcurrentHashMap<>();
- this.regionRouteMap = new RegionRouteMap();
}
public int generateNextRegionGroupId() {
@@ -712,73 +700,6 @@ public class PartitionInfo implements SnapshotProcessor {
return result;
}
- /**
- * Update RegionGroupStatistics through consensus-write
- *
- * @param updateLoadStatisticsPlan UpdateLoadStatisticsPlan
- */
- public TSStatus updateRegionGroupStatisticsAndRegionRouteMap(
- UpdateLoadStatisticsPlan updateLoadStatisticsPlan) {
- if (!updateLoadStatisticsPlan.getRegionGroupStatisticsMap().isEmpty()) {
- synchronized (regionGroupStatisticsMap) {
- // Update regionGroupStatisticsMap
- regionGroupStatisticsMap.putAll(updateLoadStatisticsPlan.getRegionGroupStatisticsMap());
- // Log current RegionGroupStatistics
- LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
- for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
- regionGroupStatisticsMap.entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t {}={}",
- regionGroupStatisticsEntry.getKey(),
- regionGroupStatisticsEntry.getValue());
- }
- }
- }
-
- if (!updateLoadStatisticsPlan.getRegionRouteMap().isEmpty()) {
- synchronized (regionRouteMap) {
- // Update regionLeaderMap
- regionRouteMap
- .getRegionLeaderMap()
- .putAll(updateLoadStatisticsPlan.getRegionRouteMap().getRegionLeaderMap());
- // Log current regionLeaderMap
- LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
- for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
- regionRouteMap.getRegionLeaderMap().entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t {}={}",
- regionLeaderEntry.getKey(),
- regionLeaderEntry.getValue());
- }
-
- // Update regionPriorityMap
- regionRouteMap
- .getRegionPriorityMap()
- .putAll(updateLoadStatisticsPlan.getRegionRouteMap().getRegionPriorityMap());
- // Log current regionPriorityMap
- LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
- for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
- regionRouteMap.getRegionPriorityMap().entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t {}={}",
- regionPriorityEntry.getKey(),
- regionPriorityEntry.getValue());
- }
- }
- }
-
- return RpcUtils.SUCCESS_STATUS;
- }
-
- /** Only used when the ConfigNode-Leader is switched */
- public Map<TConsensusGroupId, RegionGroupStatistics> getRegionGroupStatisticsMap() {
- return regionGroupStatisticsMap;
- }
-
- public RegionRouteMap getRegionRouteMap() {
- return regionRouteMap;
- }
-
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
@@ -815,17 +736,6 @@ public class PartitionInfo implements SnapshotProcessor {
task.serialize(fileOutputStream, protocol);
}
- // serialize RegionGroupStatistics
- ReadWriteIOUtils.write(regionGroupStatisticsMap.size(), fileOutputStream);
- for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
- regionGroupStatisticsMap.entrySet()) {
- regionGroupStatisticsEntry.getKey().write(protocol);
- regionGroupStatisticsEntry.getValue().serialize(fileOutputStream);
- }
-
- // serialize RegionRouteMap
- regionRouteMap.serialize(fileOutputStream, protocol);
-
// write to file
fileOutputStream.flush();
fileOutputStream.close();
@@ -882,19 +792,6 @@ public class PartitionInfo implements SnapshotProcessor {
RegionMaintainTask task = RegionMaintainTask.Factory.create(fileInputStream, protocol);
regionMaintainTaskList.add(task);
}
-
- // restore RegionGroupStatistics
- length = ReadWriteIOUtils.readInt(fileInputStream);
- for (int i = 0; i < length; i++) {
- TConsensusGroupId groupId = new TConsensusGroupId();
- groupId.read(protocol);
- RegionGroupStatistics regionGroupStatistics = new RegionGroupStatistics();
- regionGroupStatistics.deserialize(fileInputStream);
- regionGroupStatisticsMap.put(groupId, regionGroupStatistics);
- }
-
- // restore RegionRouteMap
- regionRouteMap.deserialize(fileInputStream, protocol);
}
}
@@ -944,8 +841,6 @@ public class PartitionInfo implements SnapshotProcessor {
nextRegionGroupId.set(-1);
storageGroupPartitionTables.clear();
regionMaintainTaskList.clear();
- regionGroupStatisticsMap.clear();
- regionRouteMap.clear();
}
@Override
@@ -955,18 +850,11 @@ public class PartitionInfo implements SnapshotProcessor {
PartitionInfo that = (PartitionInfo) o;
return nextRegionGroupId.get() == that.nextRegionGroupId.get()
&& storageGroupPartitionTables.equals(that.storageGroupPartitionTables)
- && regionMaintainTaskList.equals(that.regionMaintainTaskList)
- && regionGroupStatisticsMap.equals(that.regionGroupStatisticsMap)
- && regionRouteMap.equals(that.regionRouteMap);
+ && regionMaintainTaskList.equals(that.regionMaintainTaskList);
}
@Override
public int hashCode() {
- return Objects.hash(
- nextRegionGroupId,
- storageGroupPartitionTables,
- regionMaintainTaskList,
- regionGroupStatisticsMap,
- regionRouteMap);
+ return Objects.hash(nextRegionGroupId, storageGroupPartitionTables, regionMaintainTaskList);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 414d8aa548..c844411911 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.persistence.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index debbc5a5c3..0820a54851 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -47,11 +47,11 @@ import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index a2ad2e2cda..6577653e38 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.consensus.ConsensusFactory;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/OperatePipeProcedureRollbackProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/OperatePipeProcedureRollbackProcessor.java
index e23405bda3..9f30929df5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/OperatePipeProcedureRollbackProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/OperatePipeProcedureRollbackProcessor.java
@@ -26,8 +26,8 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index afc54e913f..255eab7da0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -91,6 +91,7 @@ public class ConfigNode implements ConfigNodeMBean {
/* Restart */
if (SystemPropertiesUtils.isRestarted()) {
LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
+ /* Always set ConfigNodeId before initConsensusManager */
CONF.setConfigNodeId(SystemPropertiesUtils.loadConfigNodeIdWhenRestarted());
configManager.initConsensusManager();
setUpRPCService();
@@ -105,7 +106,7 @@ public class ConfigNode implements ConfigNodeMBean {
"The current {} is now starting as the Seed-ConfigNode.",
ConfigNodeConstant.GLOBAL_NAME);
- // Init consensusGroup
+ /* Always set ConfigNodeId before initConsensusManager */
CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
configManager.initConsensusManager();
@@ -249,6 +250,7 @@ public class ConfigNode implements ConfigNodeMBean {
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ /* Always set ConfigNodeId before initConsensusManager */
CONF.setConfigNodeId(resp.getConfigNodeId());
configManager.initConsensusManager();
return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index f4a13b3478..7fc03abc08 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -65,7 +65,6 @@ import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
@@ -602,11 +601,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
@Override
public TRegionRouteMapResp getLatestRegionRouteMap() {
- TRegionRouteMapResp resp = configManager.getLatestRegionRouteMap();
- if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LoadManager.printRegionRouteMap(resp.getTimestamp(), resp.getRegionRouteMap());
- }
- return resp;
+ return configManager.getLatestRegionRouteMap();
}
@Override
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 2c229f592d..66a9a60350 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -30,8 +30,6 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
@@ -81,7 +79,6 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProce
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
@@ -106,13 +103,8 @@ import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTrigger
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
-import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
-import org.apache.iotdb.confignode.persistence.node.NodeStatistics;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionGroupStatistics;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionStatistics;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
@@ -1178,58 +1170,6 @@ public class ConfigPhysicalPlanSerDeTest {
Assert.assertEquals(getSeriesSlotListPlan0, getSeriesSlotListPlan1);
}
- @Test
- public void UpdateLoadStatisticsPlanTest() throws IOException {
- UpdateLoadStatisticsPlan updateLoadStatisticsPlan0 = new UpdateLoadStatisticsPlan();
-
- // Set NodeStatistics
- for (int i = 0; i < 3; i++) {
- updateLoadStatisticsPlan0.putNodeStatistics(
- i, new NodeStatistics(i, NodeStatus.Running, null));
- }
-
- // Set RegionGroupStatistics
- for (int i = 0; i < 10; i++) {
- Map<Integer, RegionStatistics> regionStatisticsMap = new HashMap<>();
- for (int j = 0; j < 3; j++) {
- regionStatisticsMap.put(j, new RegionStatistics(RegionStatus.Unknown));
- }
- updateLoadStatisticsPlan0.putRegionGroupStatistics(
- new TConsensusGroupId(DataRegion, i),
- new RegionGroupStatistics(RegionGroupStatus.Available, regionStatisticsMap));
- }
-
- // Set RegionRouteMap
- RegionRouteMap regionRouteMap = new RegionRouteMap();
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
- Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new HashMap<>();
- for (int i = 0; i < 10; i++) {
- TConsensusGroupId regionGroupId = new TConsensusGroupId(SchemaRegion, i);
- TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
- regionReplicaSet.setRegionId(regionGroupId);
- for (int j = 0; j < 3; j++) {
- regionReplicaSet.addToDataNodeLocations(
- new TDataNodeLocation(
- j,
- new TEndPoint("0.0.0.0", 6667 + j),
- new TEndPoint("0.0.0.0", 9003 + j),
- new TEndPoint("0.0.0.0", 8777 + j),
- new TEndPoint("0.0.0.0", 40010 + j),
- new TEndPoint("0.0.0.0", 50010 + j)));
- }
- regionLeaderMap.put(regionGroupId, i % 3);
- regionPriorityMap.put(regionGroupId, regionReplicaSet);
- }
- regionRouteMap.setRegionLeaderMap(regionLeaderMap);
- regionRouteMap.setRegionPriorityMap(regionPriorityMap);
- updateLoadStatisticsPlan0.setRegionRouteMap(regionRouteMap);
-
- UpdateLoadStatisticsPlan updateLoadStatisticsPlan1 =
- (UpdateLoadStatisticsPlan)
- ConfigPhysicalPlan.Factory.create(updateLoadStatisticsPlan0.serializeToByteBuffer());
- Assert.assertEquals(updateLoadStatisticsPlan0, updateLoadStatisticsPlan1);
- }
-
@Test
public void RemoveDataNodePlanTest() throws IOException {
List<TDataNodeLocation> locations = new ArrayList<>();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 63b13c8497..b369fedc6b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
@@ -70,7 +70,7 @@ public class LeaderRouterTest {
currentTimeMillis));
}
}
- nodeCacheMap.values().forEach(BaseNodeCache::updateNodeStatistics);
+ nodeCacheMap.values().forEach(BaseNodeCache::periodicUpdate);
// Get the loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
index aca4a83f95..042a80ce23 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
@@ -72,7 +72,7 @@ public class LoadScoreGreedyRouterTest {
new THeartbeatResp(currentTimeMillis, statuses[i].getStatus()),
currentTimeMillis));
}
- nodeCacheMap.values().forEach(BaseNodeCache::updateNodeStatistics);
+ nodeCacheMap.values().forEach(BaseNodeCache::periodicUpdate);
/* Get the loadScoreMap */
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
index fd2e255b74..4d93b6355b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
@@ -30,8 +32,7 @@ public class NodeCacheTest {
public void forceUpdateTest() {
DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
- // Test update to UnknownStatus
- dataNodeHeartbeatCache.updateNodeStatistics();
+ // Test default
Assert.assertEquals(NodeStatus.Unknown, dataNodeHeartbeatCache.getNodeStatus());
Assert.assertEquals(Long.MAX_VALUE, dataNodeHeartbeatCache.getLoadScore());
@@ -51,4 +52,16 @@ public class NodeCacheTest {
Assert.assertEquals(NodeStatus.ReadOnly, dataNodeHeartbeatCache.getNodeStatus());
Assert.assertEquals(Long.MAX_VALUE, dataNodeHeartbeatCache.getLoadScore());
}
+
+ @Test
+ public void periodicUpdateTest() {
+ DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
+ long currentTime = System.currentTimeMillis();
+ dataNodeHeartbeatCache.cacheHeartbeatSample(
+ new NodeHeartbeatSample(
+ new THeartbeatResp(currentTime, NodeStatus.Running.getStatus()), currentTime));
+ Assert.assertTrue(dataNodeHeartbeatCache.periodicUpdate());
+ Assert.assertEquals(NodeStatus.Running, dataNodeHeartbeatCache.getNodeStatus());
+ Assert.assertEquals(0, dataNodeHeartbeatCache.getLoadScore());
+ }
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
index a204ca84c3..69c228e14b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
@@ -21,10 +21,15 @@ package org.apache.iotdb.confignode.manager.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
public class RegionGroupCacheTest {
@Test
@@ -40,7 +45,7 @@ public class RegionGroupCacheTest {
2, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Removing));
regionGroupCache.cacheHeartbeatSample(
3, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.ReadOnly));
- regionGroupCache.updateRegionGroupStatistics();
+ Assert.assertTrue(regionGroupCache.periodicUpdate());
Assert.assertEquals(RegionStatus.Running, regionGroupCache.getStatistics().getRegionStatus(0));
Assert.assertEquals(RegionStatus.Unknown, regionGroupCache.getStatistics().getRegionStatus(1));
@@ -59,7 +64,7 @@ public class RegionGroupCacheTest {
1, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Running));
runningRegionGroup.cacheHeartbeatSample(
2, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Running));
- runningRegionGroup.updateRegionGroupStatistics();
+ Assert.assertTrue(runningRegionGroup.periodicUpdate());
Assert.assertEquals(
RegionGroupStatus.Running, runningRegionGroup.getStatistics().getRegionGroupStatus());
@@ -71,7 +76,7 @@ public class RegionGroupCacheTest {
1, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Unknown));
availableRegionGroup.cacheHeartbeatSample(
2, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Running));
- availableRegionGroup.updateRegionGroupStatistics();
+ Assert.assertTrue(availableRegionGroup.periodicUpdate());
Assert.assertEquals(
RegionGroupStatus.Available, availableRegionGroup.getStatistics().getRegionGroupStatus());
@@ -83,7 +88,7 @@ public class RegionGroupCacheTest {
1, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.ReadOnly));
disabledRegionGroup0.cacheHeartbeatSample(
2, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Running));
- disabledRegionGroup0.updateRegionGroupStatistics();
+ Assert.assertTrue(disabledRegionGroup0.periodicUpdate());
Assert.assertEquals(
RegionGroupStatus.Disabled, disabledRegionGroup0.getStatistics().getRegionGroupStatus());
@@ -95,7 +100,7 @@ public class RegionGroupCacheTest {
1, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Unknown));
disabledRegionGroup1.cacheHeartbeatSample(
2, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Unknown));
- disabledRegionGroup1.updateRegionGroupStatistics();
+ Assert.assertTrue(disabledRegionGroup1.periodicUpdate());
Assert.assertEquals(
RegionGroupStatus.Disabled, disabledRegionGroup1.getStatistics().getRegionGroupStatus());
@@ -107,8 +112,24 @@ public class RegionGroupCacheTest {
1, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Running));
disabledRegionGroup2.cacheHeartbeatSample(
2, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Removing));
- disabledRegionGroup2.updateRegionGroupStatistics();
+ Assert.assertTrue(disabledRegionGroup2.periodicUpdate());
Assert.assertEquals(
RegionGroupStatus.Disabled, disabledRegionGroup2.getStatistics().getRegionGroupStatus());
}
+
+ @Test
+ public void forceUpdateTest() {
+ long currentTime = System.currentTimeMillis();
+ Map<Integer, RegionHeartbeatSample> heartbeatSampleMap = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ heartbeatSampleMap.put(
+ i, new RegionHeartbeatSample(currentTime, currentTime, RegionStatus.Running));
+ }
+
+ RegionGroupCache regionGroupCache =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 10));
+ regionGroupCache.forceUpdate(heartbeatSampleMap);
+ Assert.assertEquals(
+ RegionGroupStatus.Running, regionGroupCache.getStatistics().getRegionGroupStatus());
+ }
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index 6baf194880..287f4f8d51 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -23,12 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
-import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
-import org.apache.iotdb.confignode.persistence.node.NodeStatistics;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
@@ -67,7 +64,6 @@ public class NodeInfoTest {
public void testSnapshot() throws TException, IOException {
registerConfigNodes();
registerDataNodes();
- recordNodeStatistics();
nodeInfo.processTakeSnapshot(snapshotDir);
NodeInfo nodeInfo1 = new NodeInfo();
@@ -110,13 +106,4 @@ public class NodeInfoTest {
new TEndPoint("127.0.0.1", 9900 + flag),
new TEndPoint("127.0.0.1", 11000 + flag));
}
-
- private void recordNodeStatistics() {
- UpdateLoadStatisticsPlan updateLoadStatisticsPlan = new UpdateLoadStatisticsPlan();
- for (int i = 0; i < 6; i++) {
- updateLoadStatisticsPlan.putNodeStatistics(
- i, new NodeStatistics(i, NodeStatus.Running, null));
- }
- nodeInfo.updateNodeStatistics(updateLoadStatisticsPlan);
- }
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index eeda986f7e..3e8f28484d 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
@@ -35,16 +34,11 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataP
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
-import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
-import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionGroupStatistics;
-import org.apache.iotdb.confignode.persistence.partition.statistics.RegionStatistics;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -63,7 +57,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
public class PartitionInfoTest {
@@ -148,9 +141,6 @@ public class PartitionInfoTest {
partitionInfo.offerRegionMaintainTasks(generateOfferRegionMaintainTasksPlan());
- partitionInfo.updateRegionGroupStatisticsAndRegionRouteMap(
- generateUpdateLoadStatisticsPlan(schemaRegionReplicaSet, dataRegionReplicaSet));
-
partitionInfo.processTakeSnapshot(snapshotDir);
PartitionInfo partitionInfo1 = new PartitionInfo();
@@ -278,39 +268,6 @@ public class PartitionInfoTest {
return offerPlan;
}
- private UpdateLoadStatisticsPlan generateUpdateLoadStatisticsPlan(
- TRegionReplicaSet schemaRegionReplicaSet, TRegionReplicaSet dataRegionReplicaSet) {
- UpdateLoadStatisticsPlan updateLoadStatisticsPlan = new UpdateLoadStatisticsPlan();
-
- // Build RegionGroupStatistics
- for (int i = 0; i < 10; i++) {
- Map<Integer, RegionStatistics> regionStatisticsMap = new HashMap<>();
- for (int j = 0; j < 3; j++) {
- regionStatisticsMap.put(j, new RegionStatistics(RegionStatus.Unknown));
- }
- updateLoadStatisticsPlan.putRegionGroupStatistics(
- new TConsensusGroupId(DataRegion, i),
- new RegionGroupStatistics(RegionGroupStatus.Available, regionStatisticsMap));
- }
-
- // Build RegionRouteMap
- RegionRouteMap regionRouteMap = new RegionRouteMap();
-
- Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>();
- regionLeaderMap.put(schemaRegionReplicaSet.getRegionId(), 0);
- regionLeaderMap.put(dataRegionReplicaSet.getRegionId(), 1);
- regionRouteMap.setRegionLeaderMap(regionLeaderMap);
-
- Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new HashMap<>();
- regionPriorityMap.put(schemaRegionReplicaSet.getRegionId(), schemaRegionReplicaSet);
- regionPriorityMap.put(dataRegionReplicaSet.getRegionId(), dataRegionReplicaSet);
- regionRouteMap.setRegionPriorityMap(regionPriorityMap);
-
- updateLoadStatisticsPlan.setRegionRouteMap(regionRouteMap);
-
- return updateLoadStatisticsPlan;
- }
-
private CreateSchemaPartitionPlan generateCreateSchemaPartitionReq(
int startFlag, TConsensusGroupId tConsensusGroupId) {
CreateSchemaPartitionPlan createSchemaPartitionPlan = new CreateSchemaPartitionPlan();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
index 2524bd89ad..4181fa9d67 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.persistence.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
index 3a53fbfcb4..cce3ed250a 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.confignode.persistence.partition.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
index 67ca6959ec..8ccf87c7c2 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.persistence.partition.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index f744a21157..5abde57eae 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -65,6 +65,7 @@
<allow class="org.apache.iotdb.commons.udf.UDFInformation" />
<allow class="org.apache.iotdb.commons.cq.CQState" />
<allow class="org.apache.iotdb.consensus.ConsensusFactory" />
+ <allow class="org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils" />
<allow pkg="org\.apache\.iotdb\.common\.rpc\.thrift.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.confignode\.rpc\.thrift.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.commons\.client\.sync.*" regex="true" />
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorizeIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorityIT.java
similarity index 99%
rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorizeIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorityIT.java
index ecfef1f8c6..7fc113ec3e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorizeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorityIT.java
@@ -51,7 +51,7 @@ import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBClusterAuthorizeIT {
+public class IoTDBClusterAuthorityIT {
@Before
public void setUp() throws Exception {
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
index 98307358b6..e71525fec1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
@@ -18,24 +18,17 @@
*/
package org.apache.iotdb.confignode.it;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
@@ -43,7 +36,6 @@ import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.thrift.TException;
import org.junit.After;
@@ -126,22 +118,8 @@ public class IoTDBConfigNodeSwitchLeaderIT {
TimeUnit.MILLISECONDS.sleep(partitionRegionRatisRPCLeaderElectionTimeoutMaxMs);
}
- /** Generate a PatternTree and serialize it into a ByteBuffer */
- private ByteBuffer generatePatternTreeBuffer(String[] paths)
- throws IllegalPathException, IOException {
- PathPatternTree patternTree = new PathPatternTree();
- for (String path : paths) {
- patternTree.appendPathPattern(new PartialPath(path));
- }
- patternTree.constructTree();
-
- PublicBAOS baos = new PublicBAOS();
- patternTree.serialize(baos);
- return ByteBuffer.wrap(baos.toByteArray());
- }
-
@Test
- public void loadStatisticsInheritIT()
+ public void basicDataInheritIT()
throws IOException, TException, IllegalPathException, InterruptedException {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -153,8 +131,6 @@ public class IoTDBConfigNodeSwitchLeaderIT {
TSStatus status;
TSchemaPartitionTableResp schemaPartitionTableResp0;
TDataPartitionTableResp dataPartitionTableResp0;
- TShowDataNodesResp showDataNodesResp0;
- Map<TConsensusGroupId, TRegionInfo> dataRegionInfoMap = new HashMap<>();
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -166,7 +142,8 @@ public class IoTDBConfigNodeSwitchLeaderIT {
// Create SchemaRegionGroups through getOrCreateSchemaPartition and record
// SchemaPartitionTable
- ByteBuffer buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
+ ByteBuffer buffer =
+ ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
schemaPartitionTableResp0 =
client.getOrCreateSchemaPartitionTable(
new TSchemaPartitionReq().setPathPatternTree(buffer));
@@ -186,24 +163,6 @@ public class IoTDBConfigNodeSwitchLeaderIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
dataPartitionTableResp0.getStatus().getCode());
-
- // Sleep to wait for UpdateLoadStatistics
- TimeUnit.MILLISECONDS.sleep(partitionRegionRatisRPCLeaderElectionTimeoutMaxMs);
-
- // Record showDataNodesResp
- showDataNodesResp0 = client.showDataNodes();
-
- // Record RegionInfo of DataRegions
- TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
- showRegionResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- if (TConsensusGroupType.DataRegion.equals(
- regionInfo.getConsensusGroupId().getType())) {
- dataRegionInfoMap.put(regionInfo.getConsensusGroupId(), regionInfo);
- }
- });
}
// Switch the current ConfigNode-Leader
@@ -212,7 +171,8 @@ public class IoTDBConfigNodeSwitchLeaderIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// Check SchemaPartitionTable
- ByteBuffer buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
+ ByteBuffer buffer =
+ ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
Assert.assertEquals(
schemaPartitionTableResp0,
client.getSchemaPartitionTable(new TSchemaPartitionReq().setPathPatternTree(buffer)));
@@ -226,23 +186,6 @@ public class IoTDBConfigNodeSwitchLeaderIT {
sgSlotsMap.put(sg1, seriesSlotMap);
Assert.assertEquals(
dataPartitionTableResp0, client.getDataPartitionTable(new TDataPartitionReq(sgSlotsMap)));
-
- // Check DataNodes' statuses
- Assert.assertEquals(showDataNodesResp0, client.showDataNodes());
-
- // Check DataRegions' RegionInfo
- TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
- Map<TConsensusGroupId, TRegionInfo> dataRegionInfoMap1 = new HashMap<>();
- showRegionResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- if (TConsensusGroupType.DataRegion.equals(
- regionInfo.getConsensusGroupId().getType())) {
- dataRegionInfoMap1.put(regionInfo.getConsensusGroupId(), regionInfo);
- }
- });
- Assert.assertEquals(dataRegionInfoMap, dataRegionInfoMap1);
}
}
}