You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/18 01:25:45 UTC
[iotdb] branch master updated: [IOTDB-3839] Fix genRealTimeRegionRoutingMap bug (#6685)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9eaa804235 [IOTDB-3839] Fix genRealTimeRegionRoutingMap bug (#6685)
9eaa804235 is described below
commit 9eaa804235069227c3f35779df2cde23966714c4
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Mon Jul 18 09:25:40 2022 +0800
[IOTDB-3839] Fix genRealTimeRegionRoutingMap bug (#6685)
---
.../iotdb/confignode/manager/ConfigManager.java | 14 ++-
.../iotdb/confignode/manager/load/LoadManager.java | 10 +-
.../manager/load/balancer/RouteBalancer.java | 4 +-
.../manager/load/balancer/router/IRouter.java | 2 +-
.../manager/load/balancer/router/LeaderRouter.java | 8 +-
.../balancer/router/LoadScoreGreedyRouter.java | 8 +-
.../load/heartbeat/ConfigNodeHeartbeatCache.java | 4 +-
.../load/heartbeat/DataNodeHeartbeatCache.java | 6 +-
.../manager/load/heartbeat/INodeCache.java | 8 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 4 +-
.../load/balancer/router/LeaderRouterTest.java | 119 ++++++++++++---------
.../balancer/router/LoadScoreGreedyRouterTest.java | 110 ++++++++++---------
12 files changed, 166 insertions(+), 131 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index d579da2161..4c2463775f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -422,8 +422,7 @@ public class ConfigManager implements IManager {
(SchemaPartitionResp) partitionManager.getSchemaPartition(getSchemaPartitionPlan);
if (isContainedReplicaSet) {
resp =
- queryResult.convertToRpcSchemaPartitionResp(
- getLoadManager().genRealTimeRoutingPolicy());
+ queryResult.convertToRpcSchemaPartitionResp(getLoadManager().genLatestRegionRouteMap());
} else {
resp = queryResult.convertToRpcSchemaPartitionTableResp();
}
@@ -480,8 +479,7 @@ public class ConfigManager implements IManager {
partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
if (isContainedReplicaSet) {
resp =
- queryResult.convertToRpcSchemaPartitionResp(
- getLoadManager().genRealTimeRoutingPolicy());
+ queryResult.convertToRpcSchemaPartitionResp(getLoadManager().genLatestRegionRouteMap());
} else {
resp = queryResult.convertToRpcSchemaPartitionTableResp();
}
@@ -516,7 +514,7 @@ public class ConfigManager implements IManager {
partitionManager.getNodePathsPartition(getNodePathsPartitionPlan);
TSchemaNodeManagementResp result =
resp.convertToRpcSchemaNodeManagementPartitionResp(
- getLoadManager().genRealTimeRoutingPolicy());
+ getLoadManager().genLatestRegionRouteMap());
// TODO: Delete or hide this LOGGER before officially release.
LOGGER.info(
@@ -548,7 +546,7 @@ public class ConfigManager implements IManager {
(DataPartitionResp) partitionManager.getDataPartition(getDataPartitionPlan);
if (isContainedReplicaSet) {
- resp = queryResult.convertToTDataPartitionResp(getLoadManager().genRealTimeRoutingPolicy());
+ resp = queryResult.convertToTDataPartitionResp(getLoadManager().genLatestRegionRouteMap());
} else {
resp = queryResult.convertToTDataPartitionTableResp();
}
@@ -587,7 +585,7 @@ public class ConfigManager implements IManager {
partitionManager.getOrCreateDataPartition(getOrCreateDataPartitionReq);
if (isContainedReplicaSet) {
- resp = queryResult.convertToTDataPartitionResp(getLoadManager().genRealTimeRoutingPolicy());
+ resp = queryResult.convertToTDataPartitionResp(getLoadManager().genLatestRegionRouteMap());
} else {
resp = queryResult.convertToTDataPartitionTableResp();
}
@@ -848,7 +846,7 @@ public class ConfigManager implements IManager {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
resp.setTimestamp(System.currentTimeMillis());
- resp.setRegionRouteMap(getLoadManager().genRealTimeRoutingPolicy());
+ resp.setRegionRouteMap(getLoadManager().genLatestRegionRouteMap());
}
return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 6beed810d3..ded047228f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -182,8 +182,8 @@ public class LoadManager {
* for each Region is based on the order in the TRegionReplicaSet. The replica with higher
* sorting result have higher priority.
*/
- public Map<TConsensusGroupId, TRegionReplicaSet> genRealTimeRoutingPolicy() {
- return routeBalancer.genRealTimeRoutingPolicy(getPartitionManager().getAllReplicaSets());
+ public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap() {
+ return routeBalancer.genLatestRegionRouteMap(getPartitionManager().getAllReplicaSets());
}
/**
@@ -191,8 +191,8 @@ public class LoadManager {
*
* @return Map<DataNodeId, loadScore>
*/
- public Map<Integer, Float> getAllLoadScores() {
- Map<Integer, Float> result = new ConcurrentHashMap<>();
+ public Map<Integer, Long> getAllLoadScores() {
+ Map<Integer, Long> result = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
@@ -298,7 +298,7 @@ public class LoadManager {
}
private void broadcastLatestRegionRouteMap() {
- Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genRealTimeRoutingPolicy();
+ Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genLatestRegionRouteMap();
List<TDataNodeInfo> onlineDataNodes = getOnlineDataNodes(-1);
CountDownLatch latch = new CountDownLatch(onlineDataNodes.size());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index bfcb0996e3..97c6d91952 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -45,9 +45,9 @@ public class RouteBalancer {
this.configManager = configManager;
}
- public Map<TConsensusGroupId, TRegionReplicaSet> genRealTimeRoutingPolicy(
+ public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
List<TRegionReplicaSet> regionReplicaSets) {
- return genRouter().genRealTimeRoutingPolicy(regionReplicaSets);
+ return genRouter().genLatestRegionRouteMap(regionReplicaSets);
}
private IRouter genRouter() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
index 938f132a3e..9eea49a911 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/IRouter.java
@@ -38,6 +38,6 @@ public interface IRouter {
* for each Region is based on the order in the TRegionReplicaSet. The replica with higher
* sorting result have higher priority.
*/
- Map<TConsensusGroupId, TRegionReplicaSet> genRealTimeRoutingPolicy(
+ Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
List<TRegionReplicaSet> replicaSets);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
index 9cc8ef8212..f0354cc983 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
@@ -35,15 +35,15 @@ public class LeaderRouter implements IRouter {
// Map<RegionGroupId, leader location>
private final Map<TConsensusGroupId, Integer> leaderMap;
// Map<DataNodeId, loadScore>
- private final Map<Integer, Float> loadScoreMap;
+ private final Map<Integer, Long> loadScoreMap;
- public LeaderRouter(Map<TConsensusGroupId, Integer> leaderMap, Map<Integer, Float> loadScoreMap) {
+ public LeaderRouter(Map<TConsensusGroupId, Integer> leaderMap, Map<Integer, Long> loadScoreMap) {
this.leaderMap = leaderMap;
this.loadScoreMap = loadScoreMap;
}
@Override
- public Map<TConsensusGroupId, TRegionReplicaSet> genRealTimeRoutingPolicy(
+ public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
List<TRegionReplicaSet> replicaSets) {
Map<TConsensusGroupId, TRegionReplicaSet> result = new ConcurrentHashMap<>();
@@ -76,7 +76,7 @@ public class LeaderRouter implements IRouter {
new Pair<>(
(double)
loadScoreMap.computeIfAbsent(
- dataNodeLocation.getDataNodeId(), empty -> Float.MAX_VALUE),
+ dataNodeLocation.getDataNodeId(), empty -> Long.MAX_VALUE),
dataNodeLocation));
});
sortList.sort(Comparator.comparingDouble(Pair::getLeft));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java
index d3b64e3999..04c3754113 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouter.java
@@ -33,15 +33,15 @@ import java.util.concurrent.ConcurrentHashMap;
public class LoadScoreGreedyRouter implements IRouter {
// Map<DataNodeId, loadScore>
- private final Map<Integer, Float> loadScoreMap;
+ private final Map<Integer, Long> loadScoreMap;
/** The constructor is used to pass in the load information needed by the algorithm */
- public LoadScoreGreedyRouter(Map<Integer, Float> loadScoreMap) {
+ public LoadScoreGreedyRouter(Map<Integer, Long> loadScoreMap) {
this.loadScoreMap = loadScoreMap;
}
@Override
- public Map<TConsensusGroupId, TRegionReplicaSet> genRealTimeRoutingPolicy(
+ public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
List<TRegionReplicaSet> replicaSets) {
Map<TConsensusGroupId, TRegionReplicaSet> result = new ConcurrentHashMap<>();
@@ -63,7 +63,7 @@ public class LoadScoreGreedyRouter implements IRouter {
new Pair<>(
(double)
loadScoreMap.computeIfAbsent(
- dataNodeLocation.getDataNodeId(), empty -> Float.MAX_VALUE),
+ dataNodeLocation.getDataNodeId(), empty -> Long.MAX_VALUE),
dataNodeLocation));
});
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index 597a9f944b..69452278be 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -91,7 +91,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
}
@Override
- public float getLoadScore() {
+ public long getLoadScore() {
// Return a copy of loadScore
switch (status) {
case Running:
@@ -99,7 +99,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
case Unknown:
default:
// The Unknown Node will get the highest loadScore
- return Float.MAX_VALUE;
+ return Long.MAX_VALUE;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 48aea8fd00..504305f2f9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -32,7 +32,7 @@ public class DataNodeHeartbeatCache implements INodeCache {
private final LinkedList<NodeHeartbeatSample> slidingWindow;
// For guiding queries, the higher the score the higher the load
- private volatile float loadScore;
+ private volatile long loadScore;
// For showing cluster
private volatile NodeStatus status;
@@ -94,7 +94,7 @@ public class DataNodeHeartbeatCache implements INodeCache {
}
@Override
- public float getLoadScore() {
+ public long getLoadScore() {
// Return a copy of loadScore
switch (status) {
case Running:
@@ -102,7 +102,7 @@ public class DataNodeHeartbeatCache implements INodeCache {
case Unknown:
default:
// The Unknown Node will get the highest loadScore
- return Float.MAX_VALUE;
+ return Long.MAX_VALUE;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
index 68de3916e0..cec510f8ac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
@@ -37,8 +37,12 @@ public interface INodeCache {
*/
boolean updateLoadStatistic();
- /** @return The latest load score of a node, the higher the score the higher the load */
- float getLoadScore();
+ /**
+ * TODO: The loadScore of each Node will be changed to Double
+ *
+ * @return The latest load score of a node, the higher the score the higher the load
+ */
+ long getLoadScore();
/** @return The latest status of a node for showing cluster */
NodeStatus getNodeStatus();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8dc23cca07..f73854b6bf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -536,7 +536,9 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
@Override
public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
- return configManager.getLatestRegionRouteMap();
+ TRegionRouteMapResp resp = configManager.getLatestRegionRouteMap();
+ LOGGER.info("Generate a latest RegionRouteMap: {}", resp);
+ return resp;
}
@Override
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 2b43cc8ddc..2d0e7dc87a 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -23,76 +23,93 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class LeaderRouterTest {
- private static final TDataNodeLocation dataNodeLocation1 =
- new TDataNodeLocation(
- 1,
- new TEndPoint("0.0.0.0", 6667),
- new TEndPoint("0.0.0.0", 9003),
- new TEndPoint("0.0.0.0", 8777),
- new TEndPoint("0.0.0.0", 40010),
- new TEndPoint("0.0.0.0", 50010));
+ @Test
+ public void genRealTimeRoutingPolicy() {
+ /* Build TDataNodeLocations */
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ for (int i = 0; i < 6; i++) {
+ dataNodeLocations.add(
+ new TDataNodeLocation(
+ i,
+ new TEndPoint("0.0.0.0", 6667 + i),
+ new TEndPoint("0.0.0.0", 9003 + i),
+ new TEndPoint("0.0.0.0", 8777 + i),
+ new TEndPoint("0.0.0.0", 40010 + i),
+ new TEndPoint("0.0.0.0", 50010 + i)));
+ }
- private static final TDataNodeLocation dataNodeLocation2 =
- new TDataNodeLocation(
- 2,
- new TEndPoint("0.0.0.0", 6668),
- new TEndPoint("0.0.0.0", 9004),
- new TEndPoint("0.0.0.0", 8778),
- new TEndPoint("0.0.0.0", 40011),
- new TEndPoint("0.0.0.0", 50011));
+ /* Build nodeCacheMap */
+ long currentTimeMillis = System.currentTimeMillis();
+ Map<Integer, INodeCache> nodeCacheMap = new HashMap<>();
+ for (int i = 0; i < 6; i++) {
+ nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+ // Simulate that the DataNode-i returned a heartbeat at (currentTime - i * 1000) ms
+ nodeCacheMap
+ .get(i)
+ .cacheHeartbeatSample(
+ new NodeHeartbeatSample(currentTimeMillis - i * 1000, currentTimeMillis - i * 1000));
+ }
+ nodeCacheMap.values().forEach(INodeCache::updateLoadStatistic);
- private static final TDataNodeLocation dataNodeLocation3 =
- new TDataNodeLocation(
- 3,
- new TEndPoint("0.0.0.0", 6669),
- new TEndPoint("0.0.0.0", 9005),
- new TEndPoint("0.0.0.0", 8779),
- new TEndPoint("0.0.0.0", 40012),
- new TEndPoint("0.0.0.0", 50012));
+ /* Get the loadScoreMap */
+ Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
+ nodeCacheMap.forEach(
+ (dataNodeId, heartbeatCache) ->
+ loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));
- @Test
- public void genRealTimeRoutingPolicy() {
- TConsensusGroupId groupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 0);
+ /* Build TRegionReplicaSet */
+ TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
+ TRegionReplicaSet regionReplicaSet1 =
+ new TRegionReplicaSet(
+ groupId1,
+ Arrays.asList(
+ dataNodeLocations.get(2), dataNodeLocations.get(1), dataNodeLocations.get(0)));
+ TConsensusGroupId groupId2 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 2);
+ TRegionReplicaSet regionReplicaSet2 =
+ new TRegionReplicaSet(
+ groupId2,
+ Arrays.asList(
+ dataNodeLocations.get(5), dataNodeLocations.get(4), dataNodeLocations.get(3)));
/* Build leaderMap */
Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
- leaderMap.put(groupId, 2);
+ leaderMap.put(groupId1, 1);
+ leaderMap.put(groupId2, 4);
- /* Build loadScoreMap */
- Map<Integer, Float> loadScoreMap = new HashMap<>();
- loadScoreMap.put(1, (float) -10.0);
- loadScoreMap.put(2, (float) -20.0);
- loadScoreMap.put(3, (float) -30.0);
+ /* Check result */
+ Map<TConsensusGroupId, TRegionReplicaSet> result =
+ new LeaderRouter(leaderMap, loadScoreMap)
+ .genLatestRegionRouteMap(Arrays.asList(regionReplicaSet1, regionReplicaSet2));
+ Assert.assertEquals(2, result.size());
- /* Build TRegionReplicaSet */
- TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
- List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
- dataNodeLocations.add(dataNodeLocation1);
- dataNodeLocations.add(dataNodeLocation2);
- dataNodeLocations.add(dataNodeLocation3);
- regionReplicaSet.setRegionId(groupId);
- regionReplicaSet.setDataNodeLocations(dataNodeLocations);
+ TRegionReplicaSet result1 = result.get(groupId1);
+ // Leader first
+ Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(0));
+ // The others will be sorted by loadScore
+ Assert.assertEquals(dataNodeLocations.get(0), result1.getDataNodeLocations().get(1));
+ Assert.assertEquals(dataNodeLocations.get(2), result1.getDataNodeLocations().get(2));
- TRegionReplicaSet result =
- new LeaderRouter(leaderMap, loadScoreMap)
- .genRealTimeRoutingPolicy(Collections.singletonList(regionReplicaSet))
- .get(groupId);
- /* Leader first */
- Assert.assertEquals(dataNodeLocation2, result.getDataNodeLocations().get(0));
- /* The others will be sorted by loadScore */
- Assert.assertEquals(dataNodeLocation3, result.getDataNodeLocations().get(1));
- Assert.assertEquals(dataNodeLocation1, result.getDataNodeLocations().get(2));
+ TRegionReplicaSet result2 = result.get(groupId2);
+ // Leader first
+ Assert.assertEquals(dataNodeLocations.get(4), result2.getDataNodeLocations().get(0));
+ // The others will be sorted by loadScore
+ Assert.assertEquals(dataNodeLocations.get(3), result2.getDataNodeLocations().get(1));
+ Assert.assertEquals(dataNodeLocations.get(5), result2.getDataNodeLocations().get(2));
}
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
index 528a88c973..faff66243e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LoadScoreGreedyRouterTest.java
@@ -23,70 +23,84 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class LoadScoreGreedyRouterTest {
- private static final TDataNodeLocation dataNodeLocation1 =
- new TDataNodeLocation(
- 1,
- new TEndPoint("0.0.0.0", 6667),
- new TEndPoint("0.0.0.0", 9003),
- new TEndPoint("0.0.0.0", 8777),
- new TEndPoint("0.0.0.0", 40010),
- new TEndPoint("0.0.0.0", 50010));
-
- private static final TDataNodeLocation dataNodeLocation2 =
- new TDataNodeLocation(
- 2,
- new TEndPoint("0.0.0.0", 6668),
- new TEndPoint("0.0.0.0", 9004),
- new TEndPoint("0.0.0.0", 8778),
- new TEndPoint("0.0.0.0", 40011),
- new TEndPoint("0.0.0.0", 50011));
+ @Test
+ public void testGenLoadScoreGreedyRoutingPolicy() {
+ /* Build TDataNodeLocations */
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ for (int i = 0; i < 6; i++) {
+ dataNodeLocations.add(
+ new TDataNodeLocation(
+ i,
+ new TEndPoint("0.0.0.0", 6667 + i),
+ new TEndPoint("0.0.0.0", 9003 + i),
+ new TEndPoint("0.0.0.0", 8777 + i),
+ new TEndPoint("0.0.0.0", 40010 + i),
+ new TEndPoint("0.0.0.0", 50010 + i)));
+ }
- private static final TDataNodeLocation dataNodeLocation3 =
- new TDataNodeLocation(
- 3,
- new TEndPoint("0.0.0.0", 6669),
- new TEndPoint("0.0.0.0", 9005),
- new TEndPoint("0.0.0.0", 8779),
- new TEndPoint("0.0.0.0", 40012),
- new TEndPoint("0.0.0.0", 50012));
+ /* Build nodeCacheMap */
+ long currentTimeMillis = System.currentTimeMillis();
+ Map<Integer, INodeCache> nodeCacheMap = new HashMap<>();
+ for (int i = 0; i < 6; i++) {
+ nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+ // Simulate that the DataNode-i returned a heartbeat at (currentTime - i * 1000) ms
+ nodeCacheMap
+ .get(i)
+ .cacheHeartbeatSample(
+ new NodeHeartbeatSample(currentTimeMillis - i * 1000, currentTimeMillis - i * 1000));
+ }
+ nodeCacheMap.values().forEach(INodeCache::updateLoadStatistic);
- @Test
- public void testGenRealTimeRoutingPolicy() {
- /* Build loadScoreMap */
- Map<Integer, Float> loadScoreMap = new HashMap<>();
- loadScoreMap.put(1, (float) -10.0);
- loadScoreMap.put(2, (float) -20.0);
- loadScoreMap.put(3, (float) -30.0);
+ /* Get the loadScoreMap */
+ Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
+ nodeCacheMap.forEach(
+ (dataNodeId, heartbeatCache) ->
+ loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));
/* Build TRegionReplicaSet */
- TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
- TConsensusGroupId groupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 0);
- List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
- dataNodeLocations.add(dataNodeLocation1);
- dataNodeLocations.add(dataNodeLocation2);
- dataNodeLocations.add(dataNodeLocation3);
- regionReplicaSet.setRegionId(groupId);
- regionReplicaSet.setDataNodeLocations(dataNodeLocations);
+ TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
+ TRegionReplicaSet regionReplicaSet1 =
+ new TRegionReplicaSet(
+ groupId1,
+ Arrays.asList(
+ dataNodeLocations.get(2), dataNodeLocations.get(1), dataNodeLocations.get(0)));
+ TConsensusGroupId groupId2 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 2);
+ TRegionReplicaSet regionReplicaSet2 =
+ new TRegionReplicaSet(
+ groupId2,
+ Arrays.asList(
+ dataNodeLocations.get(5), dataNodeLocations.get(4), dataNodeLocations.get(3)));
- TRegionReplicaSet result =
+ /* Check result */
+ Map<TConsensusGroupId, TRegionReplicaSet> result =
new LoadScoreGreedyRouter(loadScoreMap)
- .genRealTimeRoutingPolicy(Collections.singletonList(regionReplicaSet))
- .get(groupId);
- /* Sort the Replicas by their loadScore */
- Assert.assertEquals(dataNodeLocation3, result.getDataNodeLocations().get(0));
- Assert.assertEquals(dataNodeLocation2, result.getDataNodeLocations().get(1));
- Assert.assertEquals(dataNodeLocation1, result.getDataNodeLocations().get(2));
+ .genLatestRegionRouteMap(Arrays.asList(regionReplicaSet1, regionReplicaSet2));
+ Assert.assertEquals(2, result.size());
+
+ TRegionReplicaSet result1 = result.get(groupId1);
+ for (int i = 0; i < 3; i++) {
+ Assert.assertEquals(dataNodeLocations.get(i), result1.getDataNodeLocations().get(i));
+ }
+
+ TRegionReplicaSet result2 = result.get(groupId2);
+ for (int i = 3; i < 6; i++) {
+ Assert.assertEquals(dataNodeLocations.get(i), result2.getDataNodeLocations().get(i - 3));
+ }
}
}