You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/08/03 14:41:18 UTC

[GitHub] [inlong] leosanqing commented on a diff in pull request #5332: [INLONG-5222][Manager][Agent][DataProxy] Add heartbeat mechanism for Inlong component cluster

leosanqing commented on code in PR #5332:
URL: https://github.com/apache/inlong/pull/5332#discussion_r936751297


##########
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java:
##########
@@ -106,6 +176,50 @@ private TaskSnapshotRequest getHeartBeat() {
         return taskSnapshotRequest;
     }
 
+    /**
+     * build heartbeat message of agent
+     */
+    private HeartbeatMsg buildHeartbeatMsg() {
+        HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
+        final String agentIp = AgentUtils.fetchLocalIp();
+        final int agentPort = conf.getInt(AGENT_HTTP_PORT, DEFAULT_AGENT_HTTP_PORT);
+        final String clusterName = conf.get(AGENT_CLUSTER_NAME);
+        final String clusterTag = conf.get(AGENT_CLUSTER_TAG);
+        heartbeatMsg.setIp(agentIp);
+        heartbeatMsg.setPort(agentPort);
+        heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getName());
+        heartbeatMsg.setReportTime(System.currentTimeMillis());
+        if (StringUtils.isNotBlank(clusterName)) {
+            heartbeatMsg.setClusterName(clusterName);
+        }
+        if (StringUtils.isNotBlank(clusterTag)) {
+            heartbeatMsg.setClusterTag(clusterTag);
+        }
+        Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
+        List<GroupHeartbeat> groupHeartbeats = Lists.newArrayList();
+        List<StreamHeartbeat> streamHeartbeats = Lists.newArrayList();
+        jobWrapperMap.values().stream().forEach(jobWrapper -> {
+            Job job = jobWrapper.getJob();
+            JobProfile jobProfile = job.getJobConf();
+            final String groupId = jobProfile.get(JOB_GROUP_ID);
+            final String streamId = jobProfile.get(JOB_STREAM_ID);
+            State currentState = jobWrapper.getCurrentState();
+            String status = currentState.name();
+            GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
+            groupHeartbeat.setInlongGroupId(groupId);
+            groupHeartbeat.setStatus(status);
+            groupHeartbeats.add(groupHeartbeat);
+            StreamHeartbeat streamHeartbeat = new StreamHeartbeat();

Review Comment:
   It is better to have a blank line between the fields of the group and the fields of the stream, for readability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org