You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/08/01 06:43:16 UTC
[iotdb] branch beyyes/confignode_develop updated: add more logs for DataNodeHeartbeat
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/confignode_develop
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/confignode_develop by this push:
new 5ada3b7e4e add more logs for DataNodeHeartbeat
5ada3b7e4e is described below
commit 5ada3b7e4e5af9465a5351ff4825f84acf008252
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Aug 1 14:42:56 2022 +0800
add more logs for DataNodeHeartbeat
---
.../async/handlers/DataNodeHeartbeatHandler.java | 6 ++++++
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../iotdb/confignode/manager/load/LoadManager.java | 4 ++--
.../manager/load/balancer/router/LeaderRouter.java | 20 +++++++++++++++-----
.../load/heartbeat/ConfigNodeHeartbeatCache.java | 6 +++---
.../thrift/impl/DataNodeInternalRPCServiceImpl.java | 1 +
6 files changed, 28 insertions(+), 11 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index 6891c032e7..1e2832a911 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -61,6 +61,12 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
// Update RegionCache
if (heartbeatResp.isSetJudgedLeaders()) {
+ LOGGER.info(
+ "DataNodeHeartbeatHandler complete, heartBeatTime: {}, receiveTime: {}, dataNode: {}, judgeLeaders: {}",
+ heartbeatResp.getHeartbeatTimestamp(),
+ receiveTime,
+ dataNodeLocation.getDataNodeId(),
+ heartbeatResp.getJudgedLeaders());
heartbeatResp
.getJudgedLeaders()
.forEach(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a5c253906e..c27a54cad2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -487,7 +487,7 @@ public class ConfigManager implements IManager {
// TODO: Delete or hide this LOGGER before officially release.
LOGGER.info(
"GetOrCreateSchemaPartition receive devicePaths: {}, return TSchemaPartitionResp: {}",
- devicePaths,
+ new HashSet<>(devicePaths),
resp);
return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index d7feb93edc..c998ee43b1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -81,7 +81,7 @@ public class LoadManager {
private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
private static final long HEARTBEAT_INTERVAL = conf.getHeartbeatInterval();
- public static final TEndPoint currentNode =
+ public static final TEndPoint CURRENT_NODE =
new TEndPoint(conf.getInternalAddress(), conf.getInternalPort());
private final IManager configManager;
@@ -368,7 +368,7 @@ public class LoadManager {
// Send heartbeat requests
for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
- if (configNodeLocation.getInternalEndPoint().equals(currentNode)) {
+ if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
// Skip itself
nodeCacheMap.putIfAbsent(
configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
index 3cc9b7f612..dcfc1726b8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
@@ -26,11 +26,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Vector;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/** The LeaderRouter always pick the leader Replica */
public class LeaderRouter implements IRouter {
@@ -38,6 +36,7 @@ public class LeaderRouter implements IRouter {
// Map<RegionGroupId, leader location>
private final Map<TConsensusGroupId, Integer> leaderMap;
+
// Map<DataNodeId, loadScore>
private final Map<Integer, Long> loadScoreMap;
@@ -66,7 +65,7 @@ public class LeaderRouter implements IRouter {
}
// 2. Sort replicaSets by loadScore and pick the rest. List<Pair<loadScore,
// TDataNodeLocation>> for sorting
- List<Pair<Double, TDataNodeLocation>> sortList = new Vector<>();
+ List<Pair<Double, TDataNodeLocation>> sortList = new ArrayList<>();
replicaSet
.getDataNodeLocations()
.forEach(
@@ -89,6 +88,17 @@ public class LeaderRouter implements IRouter {
}
result.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
});
+ LOGGER.info("replicaSets:");
+ replicaSets.forEach(
+ t ->
+ LOGGER.info(
+ t.getRegionId().getType()
+ + ","
+ + t.getRegionId().getId()
+ + ","
+ + t.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList())));
LOGGER.info("genLatestRegionRouteMap:");
LOGGER.info("leaderMap:");
leaderMap.forEach((id, leader) -> LOGGER.info(id + "," + leader));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index 69452278be..5deb9eaa20 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -27,7 +27,7 @@ import java.util.LinkedList;
public class ConfigNodeHeartbeatCache implements INodeCache {
// Cache heartbeat samples
- private static final int maximumWindowSize = 100;
+ private static final int MAXIMUM_WINDOW_SIZE = 100;
private final LinkedList<NodeHeartbeatSample> slidingWindow;
private final TConfigNodeLocation configNodeLocation;
@@ -51,7 +51,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
slidingWindow.add(newHeartbeatSample);
}
- if (slidingWindow.size() > maximumWindowSize) {
+ if (slidingWindow.size() > MAXIMUM_WINDOW_SIZE) {
slidingWindow.removeFirst();
}
}
@@ -59,7 +59,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
@Override
public boolean updateLoadStatistic() {
- if (configNodeLocation.getInternalEndPoint().equals(LoadManager.currentNode)) {
+ if (configNodeLocation.getInternalEndPoint().equals(LoadManager.CURRENT_NODE)) {
// We don't need to update itself
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9d4d1caf71..8f166ab2d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -382,6 +382,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws TException {
+
THeartbeatResp resp = new THeartbeatResp();
resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());