You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/13 11:42:48 UTC
[inlong] 01/02: [INLONG-6159][Manager] Fix heartbeat status update failed error (#6161)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 955e4d5d7852284fd6e3f97b0926c05083a7a683
Author: Lucas <10...@users.noreply.github.com>
AuthorDate: Thu Oct 13 15:33:59 2022 +0800
[INLONG-6159][Manager] Fix heartbeat status update failed error (#6161)
* Fix heartbeat status update
* Add comments and make the code more readable
Co-authored-by: healchow <he...@gmail.com>
---
.../service/core/heartbeat/HeartbeatManager.java | 31 +++++++++++++++-------
1 file changed, 21 insertions(+), 10 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
index 6027fa908..757ab5bea 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
@@ -46,11 +46,13 @@ import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-@Component
@Slf4j
+@Component
public class HeartbeatManager implements AbstractHeartbeatManager {
private static final String AUTO_REGISTERED = "auto registered";
+ private static final int UPDATED_ONE_ROW = 1; // updated one row
+ private static final int UPDATE_ZERO_ROW = 0; // no field updated
@Getter
private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
@@ -92,18 +94,27 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
componentHeartbeat.getComponentType());
return;
}
+
+ // if the heartbeat was not in the cache, insert or update the node by the heartbeat info
HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
+ boolean exist = true;
+ int updateNum = UPDATE_ZERO_ROW;
if (lastHeartbeat == null) {
+ exist = false;
InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
if (clusterNode == null) {
- insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
- log.info("insert node success");
+ updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
+ log.info("insert node result: {}", updateNum);
} else {
- updateClusterNode(clusterNode);
- log.info("update node success");
+ updateNum = updateClusterNode(clusterNode);
+ log.info("update node result: {}", updateNum);
}
}
- heartbeatCache.put(componentHeartbeat, heartbeat);
+
+ // if the heartbeat already exists, or does not exist but insert/update success, then put it into the cache
+ if (exist || updateNum == UPDATED_ONE_ROW) {
+ heartbeatCache.put(componentHeartbeat, heartbeat);
+ }
}
private void evictClusterNode(HeartbeatMsg heartbeat) {
@@ -134,7 +145,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
return clusterNodeMapper.selectByUniqueKey(nodeRequest);
}
- private void insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) {
+ private int insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) {
InlongClusterNodeEntity clusterNode = new InlongClusterNodeEntity();
clusterNode.setParentId(clusterInfo.getId());
clusterNode.setType(heartbeat.getComponentType());
@@ -144,12 +155,12 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
clusterNode.setCreator(creator);
clusterNode.setModifier(creator);
clusterNode.setDescription(AUTO_REGISTERED);
- clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
+ return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
}
- private void updateClusterNode(InlongClusterNodeEntity clusterNode) {
+ private int updateClusterNode(InlongClusterNodeEntity clusterNode) {
clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
- clusterNodeMapper.updateById(clusterNode);
+ return clusterNodeMapper.updateById(clusterNode);
}
private ClusterInfo fetchCluster(ComponentHeartbeat componentHeartbeat) {