You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/07 11:48:17 UTC
[inlong] branch master updated: [INLONG-6439][DataProxy] Added service status and load fields in heartbeat request (#6441)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d1db7d6f4 [INLONG-6439][DataProxy] Added service status and load fields in heartbeat request (#6441)
d1db7d6f4 is described below
commit d1db7d6f4831bdeb5ebd77d5bb1178ec8b236a5b
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Nov 7 19:48:11 2022 +0800
[INLONG-6439][DataProxy] Added service status and load fields in heartbeat request (#6441)
---
.../apache/inlong/common/enums/NodeSrvStatus.java | 57 ++++++++++++++++++++++
.../common/heartbeat/ComponentHeartbeat.java | 29 ++++++++++-
.../inlong/common/heartbeat/HeartbeatMsg.java | 15 +++++-
.../dataproxy/heartbeat/HeartbeatManager.java | 4 ++
4 files changed, 102 insertions(+), 3 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/NodeSrvStatus.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/NodeSrvStatus.java
new file mode 100644
index 000000000..973dd00fa
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/NodeSrvStatus.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Enum of node service status.
+ */
+public enum NodeSrvStatus {
+
+ OK(0, "Service ok"),
+
+ SERVICE_UNREADY(1, "Service not ready"),
+
+ UNKNOWN_ERROR(Integer.MAX_VALUE - 1, "Unknown status");
+
+ private final int statusId;
+ private final String statusDesc;
+
+ NodeSrvStatus(int statusId, String statusDesc) {
+ this.statusId = statusId;
+ this.statusDesc = statusDesc;
+ }
+
+ public static NodeSrvStatus valueOf(int value) {
+ for (NodeSrvStatus nodeStatus : NodeSrvStatus.values()) {
+ if (nodeStatus.getStatusId() == value) {
+ return nodeStatus;
+ }
+ }
+
+ return UNKNOWN_ERROR;
+ }
+
+ public int getStatusId() {
+ return statusId;
+ }
+
+ public String getStatusDesc() {
+ return statusDesc;
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
index 266a05ce2..a7e301abd 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
@@ -17,6 +17,7 @@
package org.apache.inlong.common.heartbeat;
+import org.apache.inlong.common.enums.NodeSrvStatus;
import lombok.Data;
/**
@@ -25,6 +26,9 @@ import lombok.Data;
@Data
public class ComponentHeartbeat {
+ // node service status
+ private NodeSrvStatus nodeSrvStatus;
+
private String clusterTag;
private String clusterName;
@@ -39,11 +43,31 @@ public class ComponentHeartbeat {
private String inCharges;
+ // node load
+ private Integer load;
+
public ComponentHeartbeat() {
}
- public ComponentHeartbeat(String clusterTag, String clusterName, String componentType, String ip, String port,
- String inCharges, String protocolType) {
+ public ComponentHeartbeat(String clusterTag, String clusterName,
+ String componentType, String ip, String port,
+ String inCharges, String protocolType) {
+ this.nodeSrvStatus = NodeSrvStatus.OK;
+ this.clusterTag = clusterTag;
+ this.clusterName = clusterName;
+ this.componentType = componentType;
+ this.ip = ip;
+ this.port = port;
+ this.protocolType = protocolType;
+ this.inCharges = inCharges;
+ this.load = 0xffff;
+ }
+
+ public ComponentHeartbeat(NodeSrvStatus nodeSrvStatus,
+ String clusterTag, String clusterName,
+ String componentType, String ip, String port,
+ String inCharges, String protocolType, int loadValue) {
+ this.nodeSrvStatus = nodeSrvStatus;
this.clusterTag = clusterTag;
this.clusterName = clusterName;
this.componentType = componentType;
@@ -51,5 +75,6 @@ public class ComponentHeartbeat {
this.port = port;
this.protocolType = protocolType;
this.inCharges = inCharges;
+ this.load = loadValue;
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index d131b98f2..c386c9b60 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -17,6 +17,8 @@
package org.apache.inlong.common.heartbeat;
+import org.apache.inlong.common.enums.NodeSrvStatus;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -33,6 +35,11 @@ import java.util.List;
@NoArgsConstructor
public class HeartbeatMsg {
+ /**
+ * Node service status
+ */
+ private NodeSrvStatus nodeSrvStatus = NodeSrvStatus.OK;
+
/**
* Ip of component
*/
@@ -83,7 +90,13 @@ public class HeartbeatMsg {
*/
private List<StreamHeartbeat> streamHeartbeats;
+ /**
+ * node load value
+ */
+ private Integer load = 0xffff;
+
public ComponentHeartbeat componentHeartbeat() {
- return new ComponentHeartbeat(clusterTag, clusterName, componentType, ip, port, inCharges, protocolType);
+ return new ComponentHeartbeat(nodeSrvStatus, clusterTag, clusterName,
+ componentType, ip, port, inCharges, protocolType, load);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index fb6c025c1..114d2f636 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -30,6 +30,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.GroupHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
@@ -127,11 +128,14 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
if (!validReportInfo(reportInfo)) {
return null;
}
+ heartbeatMsg.setNodeSrvStatus(ConfigManager.getInstance().isMqClusterReady()
+ ? NodeSrvStatus.OK : NodeSrvStatus.SERVICE_UNREADY);
heartbeatMsg.setIp(reportInfo.getIp());
heartbeatMsg.setPort(reportInfo.getPort());
heartbeatMsg.setProtocolType(reportInfo.getProtocolType());
heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getType());
heartbeatMsg.setReportTime(System.currentTimeMillis());
+ heartbeatMsg.setLoad(0xffff);
Map<String, String> commonProperties = configManager.getCommonProperties();
heartbeatMsg.setClusterTag(commonProperties.getOrDefault(
ConfigConstants.PROXY_CLUSTER_TAG, DEFAULT_CLUSTER_TAG));