You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/08/01 06:43:16 UTC

[iotdb] branch beyyes/confignode_develop updated: add more logs for DataNodeHeartbeat

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/confignode_develop
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/confignode_develop by this push:
     new 5ada3b7e4e add more logs for DataNodeHeartbeat
5ada3b7e4e is described below

commit 5ada3b7e4e5af9465a5351ff4825f84acf008252
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Aug 1 14:42:56 2022 +0800

    add more logs for DataNodeHeartbeat
---
 .../async/handlers/DataNodeHeartbeatHandler.java     |  6 ++++++
 .../iotdb/confignode/manager/ConfigManager.java      |  2 +-
 .../iotdb/confignode/manager/load/LoadManager.java   |  4 ++--
 .../manager/load/balancer/router/LeaderRouter.java   | 20 +++++++++++++++-----
 .../load/heartbeat/ConfigNodeHeartbeatCache.java     |  6 +++---
 .../thrift/impl/DataNodeInternalRPCServiceImpl.java  |  1 +
 6 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index 6891c032e7..1e2832a911 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -61,6 +61,12 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
 
     // Update RegionCache
     if (heartbeatResp.isSetJudgedLeaders()) {
+      LOGGER.info(
+          "DataNodeHeartbeatHandler complete, heartBeatTime: {}, receiveTime: {}, dataNode: {}, judgeLeaders: {}",
+          heartbeatResp.getHeartbeatTimestamp(),
+          receiveTime,
+          dataNodeLocation.getDataNodeId(),
+          heartbeatResp.getJudgedLeaders());
       heartbeatResp
           .getJudgedLeaders()
           .forEach(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a5c253906e..c27a54cad2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -487,7 +487,7 @@ public class ConfigManager implements IManager {
       // TODO: Delete or hide this LOGGER before officially release.
       LOGGER.info(
           "GetOrCreateSchemaPartition receive devicePaths: {}, return TSchemaPartitionResp: {}",
-          devicePaths,
+          new HashSet<>(devicePaths),
           resp);
 
       return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index d7feb93edc..c998ee43b1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -81,7 +81,7 @@ public class LoadManager {
 
   private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
   private static final long HEARTBEAT_INTERVAL = conf.getHeartbeatInterval();
-  public static final TEndPoint currentNode =
+  public static final TEndPoint CURRENT_NODE =
       new TEndPoint(conf.getInternalAddress(), conf.getInternalPort());
 
   private final IManager configManager;
@@ -368,7 +368,7 @@ public class LoadManager {
 
     // Send heartbeat requests
     for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
-      if (configNodeLocation.getInternalEndPoint().equals(currentNode)) {
+      if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
         // Skip itself
         nodeCacheMap.putIfAbsent(
             configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
index 3cc9b7f612..dcfc1726b8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
@@ -26,11 +26,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Vector;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /** The LeaderRouter always pick the leader Replica */
 public class LeaderRouter implements IRouter {
@@ -38,6 +36,7 @@ public class LeaderRouter implements IRouter {
 
   // Map<RegionGroupId, leader location>
   private final Map<TConsensusGroupId, Integer> leaderMap;
+
   // Map<DataNodeId, loadScore>
   private final Map<Integer, Long> loadScoreMap;
 
@@ -66,7 +65,7 @@ public class LeaderRouter implements IRouter {
           }
           // 2. Sort replicaSets by loadScore and pick the rest. List<Pair<loadScore,
           // TDataNodeLocation>> for sorting
-          List<Pair<Double, TDataNodeLocation>> sortList = new Vector<>();
+          List<Pair<Double, TDataNodeLocation>> sortList = new ArrayList<>();
           replicaSet
               .getDataNodeLocations()
               .forEach(
@@ -89,6 +88,17 @@ public class LeaderRouter implements IRouter {
           }
           result.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
         });
+    LOGGER.info("replicaSets:");
+    replicaSets.forEach(
+        t ->
+            LOGGER.info(
+                t.getRegionId().getType()
+                    + ","
+                    + t.getRegionId().getId()
+                    + ","
+                    + t.getDataNodeLocations().stream()
+                        .map(TDataNodeLocation::getDataNodeId)
+                        .collect(Collectors.toList())));
     LOGGER.info("genLatestRegionRouteMap:");
     LOGGER.info("leaderMap:");
     leaderMap.forEach((id, leader) -> LOGGER.info(id + "," + leader));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index 69452278be..5deb9eaa20 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -27,7 +27,7 @@ import java.util.LinkedList;
 public class ConfigNodeHeartbeatCache implements INodeCache {
 
   // Cache heartbeat samples
-  private static final int maximumWindowSize = 100;
+  private static final int MAXIMUM_WINDOW_SIZE = 100;
   private final LinkedList<NodeHeartbeatSample> slidingWindow;
 
   private final TConfigNodeLocation configNodeLocation;
@@ -51,7 +51,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
         slidingWindow.add(newHeartbeatSample);
       }
 
-      if (slidingWindow.size() > maximumWindowSize) {
+      if (slidingWindow.size() > MAXIMUM_WINDOW_SIZE) {
         slidingWindow.removeFirst();
       }
     }
@@ -59,7 +59,7 @@ public class ConfigNodeHeartbeatCache implements INodeCache {
 
   @Override
   public boolean updateLoadStatistic() {
-    if (configNodeLocation.getInternalEndPoint().equals(LoadManager.currentNode)) {
+    if (configNodeLocation.getInternalEndPoint().equals(LoadManager.CURRENT_NODE)) {
       // We don't need to update itself
       return false;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9d4d1caf71..8f166ab2d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -382,6 +382,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
   @Override
   public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws TException {
+
     THeartbeatResp resp = new THeartbeatResp();
     resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());