You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/07 11:48:17 UTC

[inlong] branch master updated: [INLONG-6439][DataProxy] Added service status and load fields in heartbeat request (#6441)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d1db7d6f4 [INLONG-6439][DataProxy] Added service status and load fields in heartbeat request (#6441)
d1db7d6f4 is described below

commit d1db7d6f4831bdeb5ebd77d5bb1178ec8b236a5b
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Nov 7 19:48:11 2022 +0800

    [INLONG-6439][DataProxy] Added service status and load fields in heartbeat request (#6441)
---
 .../apache/inlong/common/enums/NodeSrvStatus.java  | 57 ++++++++++++++++++++++
 .../common/heartbeat/ComponentHeartbeat.java       | 29 ++++++++++-
 .../inlong/common/heartbeat/HeartbeatMsg.java      | 15 +++++-
 .../dataproxy/heartbeat/HeartbeatManager.java      |  4 ++
 4 files changed, 102 insertions(+), 3 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/NodeSrvStatus.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/NodeSrvStatus.java
new file mode 100644
index 000000000..973dd00fa
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/NodeSrvStatus.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Enum of node service status.
+ */
+public enum NodeSrvStatus {
+
+    OK(0, "Service ok"),
+
+    SERVICE_UNREADY(1, "Service not ready"),
+
+    UNKNOWN_ERROR(Integer.MAX_VALUE - 1, "Unknown status");
+
+    private final int statusId;
+    private final String statusDesc;
+
+    NodeSrvStatus(int statusId, String statusDesc) {
+        this.statusId = statusId;
+        this.statusDesc = statusDesc;
+    }
+
+    public static NodeSrvStatus valueOf(int value) {
+        for (NodeSrvStatus nodeStatus : NodeSrvStatus.values()) {
+            if (nodeStatus.getStatusId() == value) {
+                return nodeStatus;
+            }
+        }
+
+        return UNKNOWN_ERROR;
+    }
+
+    public int getStatusId() {
+        return statusId;
+    }
+
+    public String getStatusDesc() {
+        return statusDesc;
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
index 266a05ce2..a7e301abd 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.common.heartbeat;
 
+import org.apache.inlong.common.enums.NodeSrvStatus;
 import lombok.Data;
 
 /**
@@ -25,6 +26,9 @@ import lombok.Data;
 @Data
 public class ComponentHeartbeat {
 
+    // node service status
+    private NodeSrvStatus nodeSrvStatus;
+
     private String clusterTag;
 
     private String clusterName;
@@ -39,11 +43,31 @@ public class ComponentHeartbeat {
 
     private String inCharges;
 
+    // node load
+    private Integer load;
+
     public ComponentHeartbeat() {
     }
 
-    public ComponentHeartbeat(String clusterTag, String clusterName, String componentType, String ip, String port,
-            String inCharges, String protocolType) {
+    public ComponentHeartbeat(String clusterTag, String clusterName,
+                              String componentType, String ip, String port,
+                              String inCharges, String protocolType) {
+        this.nodeSrvStatus = NodeSrvStatus.OK;
+        this.clusterTag = clusterTag;
+        this.clusterName = clusterName;
+        this.componentType = componentType;
+        this.ip = ip;
+        this.port = port;
+        this.protocolType = protocolType;
+        this.inCharges = inCharges;
+        this.load = 0xffff;
+    }
+
+    public ComponentHeartbeat(NodeSrvStatus nodeSrvStatus,
+                              String clusterTag, String clusterName,
+                              String componentType, String ip, String port,
+                              String inCharges, String protocolType, int loadValue) {
+        this.nodeSrvStatus = nodeSrvStatus;
         this.clusterTag = clusterTag;
         this.clusterName = clusterName;
         this.componentType = componentType;
@@ -51,5 +75,6 @@ public class ComponentHeartbeat {
         this.port = port;
         this.protocolType = protocolType;
         this.inCharges = inCharges;
+        this.load = loadValue;
     }
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index d131b98f2..c386c9b60 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.common.heartbeat;
 
+import org.apache.inlong.common.enums.NodeSrvStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -33,6 +35,11 @@ import java.util.List;
 @NoArgsConstructor
 public class HeartbeatMsg {
 
+    /**
+     * Node service status
+     */
+    private NodeSrvStatus nodeSrvStatus = NodeSrvStatus.OK;
+
     /**
      * Ip of component
      */
@@ -83,7 +90,13 @@ public class HeartbeatMsg {
      */
     private List<StreamHeartbeat> streamHeartbeats;
 
+    /**
+     * node load value
+     */
+    private Integer load = 0xffff;
+
     public ComponentHeartbeat componentHeartbeat() {
-        return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges, protocolType);
+        return new ComponentHeartbeat(nodeSrvStatus, clusterTag, clusterName,
+                componentType, ip, port, inCharges, protocolType, load);
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index fb6c025c1..114d2f636 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -30,6 +30,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
 import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.GroupHeartbeat;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
@@ -127,11 +128,14 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         if (!validReportInfo(reportInfo)) {
             return null;
         }
+        heartbeatMsg.setNodeSrvStatus(ConfigManager.getInstance().isMqClusterReady()
+                ? NodeSrvStatus.OK : NodeSrvStatus.SERVICE_UNREADY);
         heartbeatMsg.setIp(reportInfo.getIp());
         heartbeatMsg.setPort(reportInfo.getPort());
         heartbeatMsg.setProtocolType(reportInfo.getProtocolType());
         heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getType());
         heartbeatMsg.setReportTime(System.currentTimeMillis());
+        heartbeatMsg.setLoad(0xffff);
         Map<String, String> commonProperties = configManager.getCommonProperties();
         heartbeatMsg.setClusterTag(commonProperties.getOrDefault(
                 ConfigConstants.PROXY_CLUSTER_TAG, DEFAULT_CLUSTER_TAG));