You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/21 07:00:21 UTC

[GitHub] [iotdb] wangchao316 commented on a diff in pull request #7642: [IOTDB-4685] Consensus heartbeat statistics

wangchao316 commented on code in PR #7642:
URL: https://github.com/apache/iotdb/pull/7642#discussion_r1001432922


##########
confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java:
##########
@@ -297,6 +298,12 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
         return syncInfo.setPipeStatus((SetPipeStatusPlan) physicalPlan);
       case DropPipe:
         return syncInfo.dropPipe((DropPipePlan) physicalPlan);
+      case UpdateLoadStatistics:
+        LOGGER.info(
+            "[UpdateLoadStatistics] Update cluster load statistics, timestamp: {}",
+            System.currentTimeMillis());
+        nodeInfo.updateNodeStatistics((UpdateLoadStatisticsPlan) physicalPlan);

Review Comment:
   this log.info print in updateNodeStatistics class



##########
confignode/src/main/java/org/apache/iotdb/confignode/manager/node/BaseNodeCache.java:
##########
@@ -32,49 +33,66 @@ public abstract class BaseNodeCache {
   public static final int MAXIMUM_WINDOW_SIZE = 100;
 
   /** SlidingWindow stores the heartbeat sample data */
-  final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
+  protected final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
 
-  /** The current status of the Node */
-  volatile NodeStatus status = NodeStatus.Unknown;
-  /** The reason why lead to the current NodeStatus (for showing cluster) */
-  volatile String statusReason;
+  protected volatile NodeStatistics statistics;

Review Comment:
   volatie only Visibility, does not sync....



##########
confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ConfigNodeHeartbeatCache.java:
##########
@@ -59,20 +59,19 @@ public boolean updateNodeStatus() {
       }
     }
 
-    String originStatus = status.getStatus();
-
+    // Update Node status
+    NodeStatus status;
     // TODO: Optimize judge logic
     if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) {
       status = NodeStatus.Unknown;
     } else {
       status = NodeStatus.Running;
     }
-    return !status.getStatus().equals(originStatus);
-  }
 
-  @Override
-  public long getLoadScore() {
-    // The ConfigNode whose status isn't Running will get the highest loadScore
-    return status == NodeStatus.Running ? 0 : Long.MAX_VALUE;
+    // Update loadScore
+    long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
+
+    NodeStatistics newStatistics = new NodeStatistics(loadScore, status, null);
+    return newStatistics.equals(statistics) ? null : (statistics = newStatistics);

Review Comment:
   return null?  or 0



##########
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java:
##########
@@ -154,85 +154,64 @@ public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap() {
   /** Start the load balancing service */
   public void startLoadBalancingService() {
     synchronized (scheduleMonitor) {
-      if (currentLoadBalancingFuture == null) {
-        currentLoadBalancingFuture =
+      if (currentLoadStatisticsFuture == null) {
+        currentLoadStatisticsFuture =
             ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                loadBalancingExecutor,
-                this::updateNodeLoadStatistic,
+                loadStatisticsExecutor,
+                this::updateLoadStatistics,
                 0,
-                NodeManager.HEARTBEAT_INTERVAL,
+                HEARTBEAT_INTERVAL,
                 TimeUnit.MILLISECONDS);
-        LOGGER.info("LoadBalancing service is started successfully.");
+        LOGGER.info("LoadStatistics service is started successfully.");
       }
     }
   }
 
   /** Stop the load balancing service */
   public void stopLoadBalancingService() {
     synchronized (scheduleMonitor) {
-      if (currentLoadBalancingFuture != null) {
-        currentLoadBalancingFuture.cancel(false);
-        currentLoadBalancingFuture = null;
-        LOGGER.info("LoadBalancing service is stopped successfully.");
+      if (currentLoadStatisticsFuture != null) {
+        currentLoadStatisticsFuture.cancel(false);
+        currentLoadStatisticsFuture = null;
+        LOGGER.info("LoadStatistics service is stopped successfully.");
       }
     }
   }
 
-  private void updateNodeLoadStatistic() {
-    AtomicBoolean existDataNodeChangesStatus = new AtomicBoolean(false);
-    AtomicBoolean existSchemaRegionGroupChangesLeader = new AtomicBoolean(false);
-    AtomicBoolean existDataRegionGroupChangesLeader = new AtomicBoolean(false);
-    boolean isNeedBroadcast = false;
+  private void updateLoadStatistics() {
+    UpdateLoadStatisticsPlan updateLoadStatisticsPlan = new UpdateLoadStatisticsPlan();
 
+    // Update NodeStatistics
     getNodeManager()
         .getNodeCacheMap()
-        .values()
         .forEach(
-            nodeCache -> {
-              boolean updateResult = nodeCache.updateNodeStatus();
-              if (nodeCache instanceof DataNodeHeartbeatCache) {
-                // Check if some DataNodes changes status
-                existDataNodeChangesStatus.compareAndSet(false, updateResult);
+            (nodeId, nodeCache) -> {
+              // Check if NodeStatistics needs to be updated
+              NodeStatistics nodeStatistics = nodeCache.updateNodeStatistics();
+              if (nodeStatistics != null) {
+                updateLoadStatisticsPlan.putNodeStatistics(nodeId, nodeStatistics);
               }
             });
 
+    // Update RegionGroupStatistics
     getPartitionManager()
         .getRegionGroupCacheMap()
-        .values()
         .forEach(
-            regionGroupCache -> {
-              boolean updateResult = regionGroupCache.updateRegionStatistics();
-              switch (regionGroupCache.getConsensusGroupId().getType()) {
-                  // Check if some RegionGroups change their leader
-                case SchemaRegion:
-                  existSchemaRegionGroupChangesLeader.compareAndSet(false, updateResult);
-                  break;
-                case DataRegion:
-                  existDataRegionGroupChangesLeader.compareAndSet(false, updateResult);
-                  break;
+            (consensusGroupId, regionGroupCache) -> {
+              // Check if RegionGroupStatistics needs to be updated
+              RegionGroupStatistics regionGroupStatistics =
+                  regionGroupCache.updateRegionGroupStatistics();
+              if (regionGroupStatistics != null) {
+                updateLoadStatisticsPlan.putRegionGroupStatistics(
+                    consensusGroupId, regionGroupStatistics);
               }
             });
 
-    if (existDataNodeChangesStatus.get()) {
-      // The RegionRouteMap must be broadcast if some DataNodes change status
-      isNeedBroadcast = true;
-    }
-
-    if (RouteBalancer.LEADER_POLICY.equals(CONF.getRoutingPolicy())) {
-      // Check the condition of leader routing policy
-      if (existSchemaRegionGroupChangesLeader.get()) {
-        // Broadcast the RegionRouteMap if some SchemaRegionGroups change their leader
-        isNeedBroadcast = true;
-      }
-      if (!ConsensusFactory.MultiLeaderConsensus.equals(CONF.getDataRegionConsensusProtocolClass())
-          && existDataRegionGroupChangesLeader.get()) {
-        // Broadcast the RegionRouteMap if some DataRegionGroups change their leader
-        // and the consensus protocol isn't MultiLeader
-        isNeedBroadcast = true;
-      }
-    }
+    // Update LoadStatistics if necessary
+    if (updateLoadStatisticsPlan.isNeedUpdate()) {

Review Comment:
   when does isNeedUpdate  set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org