You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/13 11:42:48 UTC

[inlong] 01/02: [INLONG-6159][Manager] Fix heartbeat status update failed error (#6161)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 955e4d5d7852284fd6e3f97b0926c05083a7a683
Author: Lucas <10...@users.noreply.github.com>
AuthorDate: Thu Oct 13 15:33:59 2022 +0800

    [INLONG-6159][Manager] Fix heartbeat status update failed error (#6161)
    
    * Fix heartbeat status update
    * Add comments and make the code more readable
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../service/core/heartbeat/HeartbeatManager.java   | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
index 6027fa908..757ab5bea 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
@@ -46,11 +46,13 @@ import javax.annotation.PostConstruct;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-@Component
 @Slf4j
+@Component
 public class HeartbeatManager implements AbstractHeartbeatManager {
 
     private static final String AUTO_REGISTERED = "auto registered";
+    private static final int UPDATED_ONE_ROW = 1; // updated one row
+    private static final int UPDATE_ZERO_ROW = 0; // no field updated
 
     @Getter
     private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
@@ -92,18 +94,27 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
                     componentHeartbeat.getComponentType());
             return;
         }
+
+        // if the heartbeat was not in the cache, insert or update the node by the heartbeat info
         HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
+        boolean exist = true;
+        int updateNum = UPDATE_ZERO_ROW;
         if (lastHeartbeat == null) {
+            exist = false;
             InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
             if (clusterNode == null) {
-                insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
-                log.info("insert node success");
+                updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
+                log.info("insert node result: {}", updateNum);
             } else {
-                updateClusterNode(clusterNode);
-                log.info("update node success");
+                updateNum = updateClusterNode(clusterNode);
+                log.info("update node result: {}", updateNum);
             }
         }
-        heartbeatCache.put(componentHeartbeat, heartbeat);
+
+        // if the heartbeat already exists, or does not exist but insert/update success, then put it into the cache
+        if (exist || updateNum == UPDATED_ONE_ROW) {
+            heartbeatCache.put(componentHeartbeat, heartbeat);
+        }
     }
 
     private void evictClusterNode(HeartbeatMsg heartbeat) {
@@ -134,7 +145,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         return clusterNodeMapper.selectByUniqueKey(nodeRequest);
     }
 
-    private void insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) {
+    private int insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) {
         InlongClusterNodeEntity clusterNode = new InlongClusterNodeEntity();
         clusterNode.setParentId(clusterInfo.getId());
         clusterNode.setType(heartbeat.getComponentType());
@@ -144,12 +155,12 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         clusterNode.setCreator(creator);
         clusterNode.setModifier(creator);
         clusterNode.setDescription(AUTO_REGISTERED);
-        clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
+        return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
     }
 
-    private void updateClusterNode(InlongClusterNodeEntity clusterNode) {
+    private int updateClusterNode(InlongClusterNodeEntity clusterNode) {
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
-        clusterNodeMapper.updateById(clusterNode);
+        return clusterNodeMapper.updateById(clusterNode);
     }
 
     private ClusterInfo fetchCluster(ComponentHeartbeat componentHeartbeat) {