You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/29 02:35:30 UTC

[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

luchunliang commented on a change in pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403#discussion_r836987075



##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
##########
@@ -598,21 +606,21 @@ private ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Excep
             if (jsonItem != null) {
                 if (!jsonItem.has("port")) {
                     throw new Exception("Parse local proxyList failure: "
-                        + "port field is not exist in address(" + i + ")!");
+                            + "port field is not exist in address(" + i + ")!");

Review comment:
       Why is indentation 8 space?

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -650,6 +666,25 @@ private void fillUpWorkClientWithLastBadClient() {
         }
     }
 
+    private void fillUpWorkClientWithBadClient(List<HostInfo> badHostLists) {
+        if (badHostLists.isEmpty()) {

Review comment:
       It is good thing that badHostLists is empty, why use warn level log?

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -572,29 +577,40 @@ public int compare(Map.Entry<HostInfo, Integer> o1, Map.Entry<HostInfo, Integer>
     }
 
     private void sendHeartBeat() {
-        for (HostInfo hostInfo : clientMap.keySet()) {
-            if (hostInfo == null) {
-                continue;
+        // all hbChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapHB.entrySet()) {
+            if (clientEntry.getKey() != null && clientEntry.getValue() != null) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
             }
-            NettyClient client = clientMap.get(hostInfo);
-            if (client == null) {
+        }
+
+        // only idle dataChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapData.entrySet()) {
+            if (clientEntry.getKey() == null || clientEntry.getValue() == null) {
                 continue;
             }
-            try {
-                if (client.isActive()) {
-                    //logger.info("active host to send heartbeat! {}", entry.getKey().getHostName());
-                    EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
-                            8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
-                    if (configure.isNeedAuthentication()) {
-                        encodeObject.setAuth(configure.isNeedAuthentication(),
-                                configure.getUserName(), configure.getSecretKey());
-                    }
-                    client.write(encodeObject);
+            if (sender.isIdleClient(clientEntry.getValue())) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
+            }
+        }
+
+    }
+
+    private void sendHeartBeat(HostInfo hostInfo, NettyClient client) {
+        try {
+            if (client.isActive()) {
+                logger.debug("active host to send heartbeat! {}", hostInfo.getReferenceName());
+                EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
+                        8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
+                if (configure.isNeedAuthentication()) {

Review comment:
       Heartbeat data miss HostInfo data.

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -572,29 +577,40 @@ public int compare(Map.Entry<HostInfo, Integer> o1, Map.Entry<HostInfo, Integer>
     }
 
     private void sendHeartBeat() {
-        for (HostInfo hostInfo : clientMap.keySet()) {
-            if (hostInfo == null) {
-                continue;
+        // all hbChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapHB.entrySet()) {
+            if (clientEntry.getKey() != null && clientEntry.getValue() != null) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
             }
-            NettyClient client = clientMap.get(hostInfo);
-            if (client == null) {
+        }
+
+        // only idle dataChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapData.entrySet()) {
+            if (clientEntry.getKey() == null || clientEntry.getValue() == null) {
                 continue;
             }
-            try {
-                if (client.isActive()) {
-                    //logger.info("active host to send heartbeat! {}", entry.getKey().getHostName());
-                    EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
-                            8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
-                    if (configure.isNeedAuthentication()) {
-                        encodeObject.setAuth(configure.isNeedAuthentication(),
-                                configure.getUserName(), configure.getSecretKey());
-                    }
-                    client.write(encodeObject);
+            if (sender.isIdleClient(clientEntry.getValue())) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
+            }
+        }
+
+    }
+
+    private void sendHeartBeat(HostInfo hostInfo, NettyClient client) {
+        try {
+            if (client.isActive()) {

Review comment:
       It is better to add a log when client is not active.

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
##########
@@ -869,7 +876,7 @@ private StringEntity getEntity(List<BasicNameValuePair> params)
     }
 
     private CloseableHttpClient getCloseableHttpClient(List<BasicNameValuePair> params)
-        throws NoSuchAlgorithmException, KeyManagementException {
+            throws NoSuchAlgorithmException, KeyManagementException {

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org