You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/07/29 13:30:03 UTC

[iotdb] branch beyyes/confignode_develop updated: add more logs for old branch to test region route map

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/confignode_develop
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/confignode_develop by this push:
     new 800c8f58bf add more logs for old branch to test region route map
800c8f58bf is described below

commit 800c8f58bf339c06d09523c76e33bb2a76dff8d2
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Fri Jul 29 21:28:52 2022 +0800

    add more logs for old branch to test region route map
---
 .../iotdb/confignode/manager/load/LoadManager.java    | 11 ++++++-----
 .../manager/load/balancer/router/LeaderRouter.java    | 19 ++++++++++++-------
 .../load/heartbeat/DataNodeHeartbeatCache.java        |  7 ++++---
 .../manager/load/heartbeat/RegionGroupCache.java      |  7 +------
 .../manager/load/heartbeat/RegionHeartbeatSample.java |  2 ++
 .../service/thrift/ConfigNodeRPCServiceProcessor.java |  1 -
 .../load/balancer/router/LeaderRouterTest.java        | 10 ++++++++--
 .../thrift/impl/DataNodeInternalRPCServiceImpl.java   | 18 ++++++++----------
 8 files changed, 41 insertions(+), 34 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b0e9af9dc6..d7feb93edc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -80,7 +80,7 @@ public class LoadManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
 
   private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
-  private static final long heartbeatInterval = conf.getHeartbeatInterval();
+  private static final long HEARTBEAT_INTERVAL = conf.getHeartbeatInterval();
   public static final TEndPoint currentNode =
       new TEndPoint(conf.getInternalAddress(), conf.getInternalPort());
 
@@ -223,7 +223,7 @@ public class LoadManager {
                 heartBeatExecutor,
                 this::heartbeatLoopBody,
                 0,
-                heartbeatInterval,
+                HEARTBEAT_INTERVAL,
                 TimeUnit.MILLISECONDS);
         LOGGER.info("Heartbeat service is started successfully.");
       }
@@ -235,7 +235,7 @@ public class LoadManager {
                 loadBalancingExecutor,
                 this::updateNodeLoadStatistic,
                 0,
-                heartbeatInterval,
+                HEARTBEAT_INTERVAL,
                 TimeUnit.MILLISECONDS);
         LOGGER.info("LoadBalancing service is started successfully.");
       }
@@ -340,7 +340,7 @@ public class LoadManager {
    */
   private void pingRegisteredDataNodes(List<TDataNodeConfiguration> registeredDataNodes) {
     // Generate heartbeat request
-    THeartbeatReq heartbeatReq = genHeartbeatReq();
+    // THeartbeatReq heartbeatReq = genHeartbeatReq();
 
     // Send heartbeat requests
     for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
@@ -352,9 +352,10 @@ public class LoadManager {
                       dataNodeInfo.getLocation().getDataNodeId(),
                       empty -> new DataNodeHeartbeatCache()),
               regionGroupCacheMap);
+
       AsyncDataNodeClientPool.getInstance()
           .getDataNodeHeartBeat(
-              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+              dataNodeInfo.getLocation().getInternalEndPoint(), genHeartbeatReq(), handler);
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
index f0354cc983..553dc3a61e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /** The LeaderRouter always pick the leader Replica */
 public class LeaderRouter implements IRouter {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeaderRouter.class);
 
   // Map<RegionGroupId, leader location>
   private final Map<TConsensusGroupId, Integer> leaderMap;
@@ -52,8 +56,7 @@ public class LeaderRouter implements IRouter {
           int leaderId = leaderMap.getOrDefault(replicaSet.getRegionId(), -1);
           TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
           sortedReplicaSet.setRegionId(replicaSet.getRegionId());
-
-          /* 1. Pick leader if leader exists */
+          // 1. Pick leader if leader exists
           if (leaderId != -1) {
             for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
               if (dataNodeLocation.getDataNodeId() == leaderId) {
@@ -61,9 +64,8 @@ public class LeaderRouter implements IRouter {
               }
             }
           }
-
-          /* 2. Sort replicaSets by loadScore and pick the rest */
-          // List<Pair<loadScore, TDataNodeLocation>> for sorting
+          // 2. Sort replicaSets by loadScore and pick the rest. List<Pair<loadScore,
+          // TDataNodeLocation>> for sorting
           List<Pair<Double, TDataNodeLocation>> sortList = new Vector<>();
           replicaSet
               .getDataNodeLocations()
@@ -85,10 +87,13 @@ public class LeaderRouter implements IRouter {
               sortedReplicaSet.addToDataNodeLocations(entry.getRight());
             }
           }
-
           result.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
         });
-
+    LOGGER.info("genLatestRegionRouteMap");
+    LOGGER.info("leaderMap:");
+    leaderMap.forEach((id, leader) -> System.out.println(id + "," + leader));
+    LOGGER.info("loadScoreMap:");
+    loadScoreMap.forEach((id, score) -> System.out.println(id + "," + score));
     return result;
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 495cb87682..5a2caf936f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -22,11 +22,12 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 
 import java.util.LinkedList;
 
-/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
+/**
+ * DataNodeHeartbeatCache caches and maintains all the heartbeat data TODO: This class might be
+ * split into DataNodeCache and ConfigNodeCache
+ */
 public class DataNodeHeartbeatCache implements INodeCache {
 
-  // TODO: This class might be split into DataNodeCache and ConfigNodeCache
-
   // Cache heartbeat samples
   private static final int MAXIMUM_WINDOW_SIZE = 100;
   private final LinkedList<NodeHeartbeatSample> slidingWindow;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index 99a9cae144..fd96e6335e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -29,9 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-/**
- * TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache
- */
+/** TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache */
 public class RegionGroupCache implements IRegionGroupCache {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionGroupCache.class);
@@ -41,9 +39,6 @@ public class RegionGroupCache implements IRegionGroupCache {
   private final TConsensusGroupId consensusGroupId;
 
   // Map<DataNodeId(where a RegionReplica resides), LinkedList<RegionHeartbeatSample>>
-
-  private final TConsensusGroupId consensusGroupId;
-
   private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
 
   // Indicates the version of the statistics
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
index 8631c2edbf..9a9d047a64 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
@@ -22,6 +22,8 @@ public class RegionHeartbeatSample {
 
   // Unit: ms
   private final long sendTimestamp;
+
+  // never used
   private final long receiveTimestamp;
 
   private final int belongedDataNodeId;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index ce12986a40..91054346e7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -57,7 +57,6 @@ import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
 import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.ConsensusManager;
-import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 2a428446a0..8a8ef7b726 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -40,6 +40,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.junit.Assert.assertFalse;
+
 public class LeaderRouterTest {
 
   @Test
@@ -68,13 +70,14 @@ public class LeaderRouterTest {
           .cacheHeartbeatSample(
               new NodeHeartbeatSample(currentTimeMillis - i * 1000, currentTimeMillis - i * 1000));
     }
-    nodeCacheMap.values().forEach(INodeCache::updateLoadStatistic);
+    nodeCacheMap.values().forEach(e -> assertFalse(e.updateLoadStatistic()));
 
     // Get the loadScoreMap
     Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
     nodeCacheMap.forEach(
         (dataNodeId, heartbeatCache) ->
             loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));
+    System.out.println(loadScoreMap);
 
     // Build TRegionReplicaSet
     TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
@@ -124,8 +127,9 @@ public class LeaderRouterTest {
     regionGroupCacheMap.forEach(
         (groupId, regionGroupCache) ->
             leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+    System.out.println(leaderMap);
 
-    // Check result
+    // Check resultresult = {ConcurrentHashMap@1508}  size = 2
     Map<TConsensusGroupId, TRegionReplicaSet> result =
         new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
     TRegionReplicaSet result1 = result.get(groupId1);
@@ -168,6 +172,7 @@ public class LeaderRouterTest {
       regionGroupCacheMap.forEach(
           (groupId, regionGroupCache) ->
               leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+      // System.out.println("multi-leader: " + leaderMap);
 
       // Check result
       result = new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
@@ -207,6 +212,7 @@ public class LeaderRouterTest {
     regionGroupCacheMap.forEach(
         (groupId, regionGroupCache) ->
             leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+    System.out.println("multi-leader after a datanode down: " + leaderMap);
 
     // Check result
     result = new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 31b8728813..9d4d1caf71 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -427,22 +427,20 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       DataRegionConsensusImpl.getInstance()
           .getAllConsensusGroupIds()
           .forEach(
-              groupId -> {
-                result.put(
-                    groupId.convertToTConsensusGroupId(),
-                    DataRegionConsensusImpl.getInstance().isLeader(groupId));
-              });
+              groupId ->
+                  result.put(
+                      groupId.convertToTConsensusGroupId(),
+                      DataRegionConsensusImpl.getInstance().isLeader(groupId)));
     }
 
     if (SchemaRegionConsensusImpl.getInstance() != null) {
       SchemaRegionConsensusImpl.getInstance()
           .getAllConsensusGroupIds()
           .forEach(
-              groupId -> {
-                result.put(
-                    groupId.convertToTConsensusGroupId(),
-                    SchemaRegionConsensusImpl.getInstance().isLeader(groupId));
-              });
+              groupId ->
+                  result.put(
+                      groupId.convertToTConsensusGroupId(),
+                      SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
     }
     return result;
   }