You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/07/29 13:30:03 UTC
[iotdb] branch beyyes/confignode_develop updated: add more logs for old branch to test region route map
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/confignode_develop
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/confignode_develop by this push:
new 800c8f58bf add more logs for old branch to test region route map
800c8f58bf is described below
commit 800c8f58bf339c06d09523c76e33bb2a76dff8d2
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Fri Jul 29 21:28:52 2022 +0800
add more logs for old branch to test region route map
---
.../iotdb/confignode/manager/load/LoadManager.java | 11 ++++++-----
.../manager/load/balancer/router/LeaderRouter.java | 19 ++++++++++++-------
.../load/heartbeat/DataNodeHeartbeatCache.java | 7 ++++---
.../manager/load/heartbeat/RegionGroupCache.java | 7 +------
.../manager/load/heartbeat/RegionHeartbeatSample.java | 2 ++
.../service/thrift/ConfigNodeRPCServiceProcessor.java | 1 -
.../load/balancer/router/LeaderRouterTest.java | 10 ++++++++--
.../thrift/impl/DataNodeInternalRPCServiceImpl.java | 18 ++++++++----------
8 files changed, 41 insertions(+), 34 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b0e9af9dc6..d7feb93edc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -80,7 +80,7 @@ public class LoadManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
- private static final long heartbeatInterval = conf.getHeartbeatInterval();
+ private static final long HEARTBEAT_INTERVAL = conf.getHeartbeatInterval();
public static final TEndPoint currentNode =
new TEndPoint(conf.getInternalAddress(), conf.getInternalPort());
@@ -223,7 +223,7 @@ public class LoadManager {
heartBeatExecutor,
this::heartbeatLoopBody,
0,
- heartbeatInterval,
+ HEARTBEAT_INTERVAL,
TimeUnit.MILLISECONDS);
LOGGER.info("Heartbeat service is started successfully.");
}
@@ -235,7 +235,7 @@ public class LoadManager {
loadBalancingExecutor,
this::updateNodeLoadStatistic,
0,
- heartbeatInterval,
+ HEARTBEAT_INTERVAL,
TimeUnit.MILLISECONDS);
LOGGER.info("LoadBalancing service is started successfully.");
}
@@ -340,7 +340,7 @@ public class LoadManager {
*/
private void pingRegisteredDataNodes(List<TDataNodeConfiguration> registeredDataNodes) {
// Generate heartbeat request
- THeartbeatReq heartbeatReq = genHeartbeatReq();
+ // THeartbeatReq heartbeatReq = genHeartbeatReq();
// Send heartbeat requests
for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
@@ -352,9 +352,10 @@ public class LoadManager {
dataNodeInfo.getLocation().getDataNodeId(),
empty -> new DataNodeHeartbeatCache()),
regionGroupCacheMap);
+
AsyncDataNodeClientPool.getInstance()
.getDataNodeHeartBeat(
- dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+ dataNodeInfo.getLocation().getInternalEndPoint(), genHeartbeatReq(), handler);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
index f0354cc983..553dc3a61e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -31,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
/** The LeaderRouter always pick the leader Replica */
public class LeaderRouter implements IRouter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeaderRouter.class);
// Map<RegionGroupId, leader location>
private final Map<TConsensusGroupId, Integer> leaderMap;
@@ -52,8 +56,7 @@ public class LeaderRouter implements IRouter {
int leaderId = leaderMap.getOrDefault(replicaSet.getRegionId(), -1);
TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
sortedReplicaSet.setRegionId(replicaSet.getRegionId());
-
- /* 1. Pick leader if leader exists */
+ // 1. Pick leader if leader exists
if (leaderId != -1) {
for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
if (dataNodeLocation.getDataNodeId() == leaderId) {
@@ -61,9 +64,8 @@ public class LeaderRouter implements IRouter {
}
}
}
-
- /* 2. Sort replicaSets by loadScore and pick the rest */
- // List<Pair<loadScore, TDataNodeLocation>> for sorting
+ // 2. Sort replicaSets by loadScore and pick the rest. List<Pair<loadScore,
+ // TDataNodeLocation>> for sorting
List<Pair<Double, TDataNodeLocation>> sortList = new Vector<>();
replicaSet
.getDataNodeLocations()
@@ -85,10 +87,13 @@ public class LeaderRouter implements IRouter {
sortedReplicaSet.addToDataNodeLocations(entry.getRight());
}
}
-
result.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
});
-
+ LOGGER.info("genLatestRegionRouteMap");
+ LOGGER.info("leaderMap:");
+ leaderMap.forEach((id, leader) -> System.out.println(id + "," + leader));
+ LOGGER.info("loadScoreMap:");
+ loadScoreMap.forEach((id, score) -> System.out.println(id + "," + score));
return result;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 495cb87682..5a2caf936f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -22,11 +22,12 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import java.util.LinkedList;
-/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
+/**
+ * DataNodeHeartbeatCache caches and maintains all the heartbeat data TODO: This class might be
+ * split into DataNodeCache and ConfigNodeCache
+ */
public class DataNodeHeartbeatCache implements INodeCache {
- // TODO: This class might be split into DataNodeCache and ConfigNodeCache
-
// Cache heartbeat samples
private static final int MAXIMUM_WINDOW_SIZE = 100;
private final LinkedList<NodeHeartbeatSample> slidingWindow;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index 99a9cae144..fd96e6335e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -29,9 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-/**
- * TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache
- */
+/** TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache */
public class RegionGroupCache implements IRegionGroupCache {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionGroupCache.class);
@@ -41,9 +39,6 @@ public class RegionGroupCache implements IRegionGroupCache {
private final TConsensusGroupId consensusGroupId;
// Map<DataNodeId(where a RegionReplica resides), LinkedList<RegionHeartbeatSample>>
-
- private final TConsensusGroupId consensusGroupId;
-
private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
// Indicates the version of the statistics
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
index 8631c2edbf..9a9d047a64 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
@@ -22,6 +22,8 @@ public class RegionHeartbeatSample {
// Unit: ms
private final long sendTimestamp;
+
+ // never used
private final long receiveTimestamp;
private final int belongedDataNodeId;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index ce12986a40..91054346e7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -57,7 +57,6 @@ import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 2a428446a0..8a8ef7b726 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -40,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import static org.junit.Assert.assertFalse;
+
public class LeaderRouterTest {
@Test
@@ -68,13 +70,14 @@ public class LeaderRouterTest {
.cacheHeartbeatSample(
new NodeHeartbeatSample(currentTimeMillis - i * 1000, currentTimeMillis - i * 1000));
}
- nodeCacheMap.values().forEach(INodeCache::updateLoadStatistic);
+ nodeCacheMap.values().forEach(e -> assertFalse(e.updateLoadStatistic()));
// Get the loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) ->
loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));
+ System.out.println(loadScoreMap);
// Build TRegionReplicaSet
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
@@ -124,8 +127,9 @@ public class LeaderRouterTest {
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+ System.out.println(leaderMap);
- // Check result
+ // Check resultresult = {ConcurrentHashMap@1508} size = 2
Map<TConsensusGroupId, TRegionReplicaSet> result =
new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
TRegionReplicaSet result1 = result.get(groupId1);
@@ -168,6 +172,7 @@ public class LeaderRouterTest {
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+ // System.out.println("multi-leader: " + leaderMap);
// Check result
result = new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
@@ -207,6 +212,7 @@ public class LeaderRouterTest {
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+ System.out.println("multi-leader after a datanode down: " + leaderMap);
// Check result
result = new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 31b8728813..9d4d1caf71 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -427,22 +427,20 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
DataRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
- groupId -> {
- result.put(
- groupId.convertToTConsensusGroupId(),
- DataRegionConsensusImpl.getInstance().isLeader(groupId));
- });
+ groupId ->
+ result.put(
+ groupId.convertToTConsensusGroupId(),
+ DataRegionConsensusImpl.getInstance().isLeader(groupId)));
}
if (SchemaRegionConsensusImpl.getInstance() != null) {
SchemaRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
- groupId -> {
- result.put(
- groupId.convertToTConsensusGroupId(),
- SchemaRegionConsensusImpl.getInstance().isLeader(groupId));
- });
+ groupId ->
+ result.put(
+ groupId.convertToTConsensusGroupId(),
+ SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
}
return result;
}