You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/13 11:42:47 UTC

[inlong] branch release-1.3.0 updated (5cf0b4311 -> 2247d7d5b)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 5cf0b4311 [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157)
     new 955e4d5d7 [INLONG-6159][Manager] Fix heartbeat status update failed error (#6161)
     new 2247d7d5b [INLONG-6171][Manager] Fix the lightweight group union nodes relation error (#6172)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../service/core/heartbeat/HeartbeatManager.java   | 31 +++++++++++++++-------
 .../resource/sort/DefaultSortConfigOperator.java   | 16 ++++++-----
 2 files changed, 31 insertions(+), 16 deletions(-)


[inlong] 01/02: [INLONG-6159][Manager] Fix heartbeat status update failed error (#6161)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 955e4d5d7852284fd6e3f97b0926c05083a7a683
Author: Lucas <10...@users.noreply.github.com>
AuthorDate: Thu Oct 13 15:33:59 2022 +0800

    [INLONG-6159][Manager] Fix heartbeat status update failed error (#6161)
    
    * Fix heartbeat status update
    * Add comments and make the code more readable
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../service/core/heartbeat/HeartbeatManager.java   | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
index 6027fa908..757ab5bea 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
@@ -46,11 +46,13 @@ import javax.annotation.PostConstruct;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-@Component
 @Slf4j
+@Component
 public class HeartbeatManager implements AbstractHeartbeatManager {
 
     private static final String AUTO_REGISTERED = "auto registered";
+    private static final int UPDATED_ONE_ROW = 1; // updated one row
+    private static final int UPDATE_ZERO_ROW = 0; // no field updated
 
     @Getter
     private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
@@ -92,18 +94,27 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
                     componentHeartbeat.getComponentType());
             return;
         }
+
+        // if the heartbeat was not in the cache, insert or update the node by the heartbeat info
         HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
+        boolean exist = true;
+        int updateNum = UPDATE_ZERO_ROW;
         if (lastHeartbeat == null) {
+            exist = false;
             InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
             if (clusterNode == null) {
-                insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
-                log.info("insert node success");
+                updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
+                log.info("insert node result: {}", updateNum);
             } else {
-                updateClusterNode(clusterNode);
-                log.info("update node success");
+                updateNum = updateClusterNode(clusterNode);
+                log.info("update node result: {}", updateNum);
             }
         }
-        heartbeatCache.put(componentHeartbeat, heartbeat);
+
+        // if the heartbeat already exists, or does not exist but insert/update success, then put it into the cache
+        if (exist || updateNum == UPDATED_ONE_ROW) {
+            heartbeatCache.put(componentHeartbeat, heartbeat);
+        }
     }
 
     private void evictClusterNode(HeartbeatMsg heartbeat) {
@@ -134,7 +145,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         return clusterNodeMapper.selectByUniqueKey(nodeRequest);
     }
 
-    private void insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) {
+    private int insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) {
         InlongClusterNodeEntity clusterNode = new InlongClusterNodeEntity();
         clusterNode.setParentId(clusterInfo.getId());
         clusterNode.setType(heartbeat.getComponentType());
@@ -144,12 +155,12 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         clusterNode.setCreator(creator);
         clusterNode.setModifier(creator);
         clusterNode.setDescription(AUTO_REGISTERED);
-        clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
+        return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
     }
 
-    private void updateClusterNode(InlongClusterNodeEntity clusterNode) {
+    private int updateClusterNode(InlongClusterNodeEntity clusterNode) {
         clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
-        clusterNodeMapper.updateById(clusterNode);
+        return clusterNodeMapper.updateById(clusterNode);
     }
 
     private ClusterInfo fetchCluster(ComponentHeartbeat componentHeartbeat) {


[inlong] 02/02: [INLONG-6171][Manager] Fix the lightweight group union nodes relation error (#6172)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 2247d7d5b0150d4984a6c518af6ee396d6287185
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Oct 13 17:45:17 2022 +0800

    [INLONG-6171][Manager] Fix the lightweight group union nodes relation error (#6172)
---
 .../service/resource/sort/DefaultSortConfigOperator.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 6096777ac..a4c6b5d80 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -123,18 +123,22 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
             List<StreamSource> sources = sourceMap.get(streamId);
             List<StreamSink> sinks = sinkMap.get(streamId);
             List<NodeRelation> relations;
-            if (CollectionUtils.isEmpty(transformResponses)) {
-                relations = NodeRelationUtils.createNodeRelations(sources, sinks);
-            } else {
-                relations = NodeRelationUtils.createNodeRelations(inlongStream);
-                // in standard mode, replace upstream source node and transform input fields node to mq node
-                if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
+
+            if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
+                if (CollectionUtils.isNotEmpty(transformResponses)) {
+                    relations = NodeRelationUtils.createNodeRelations(inlongStream);
+
+                    // in standard mode, replace upstream source node and transform input fields node to mq node
                     // mq node name, which is inlong stream id
                     String mqNodeName = sources.get(0).getSourceName();
                     Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses);
                     adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
                     adjustNodeRelations(relations, nodeNameSet, mqNodeName);
+                } else {
+                    relations = NodeRelationUtils.createNodeRelations(sources, sinks);
                 }
+            } else {
+                relations = NodeRelationUtils.createNodeRelations(inlongStream);
             }
 
             // create extract-transform-load nodes