You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/28 09:09:56 UTC

[GitHub] [incubator-inlong] pocozh opened a new pull request #3403: [INLONG-3387][dataproxy-sdk] Strengthen the strategy of heartbeat and channel update

pocozh opened a new pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403


   
   ### Title Name: [INLONG-3387][dataproxy-sdk] Strengthen the strategy of heartbeat and channel update
   
   where *XYZ* should be replaced by the actual issue number.
   
   Fixes #3387 
   
   ### Modifications
   
   - send heartbeat for idle dataChannel to keep alive
   - reuse bad host while dataproxy hosts are rare, avoid there is no dataChannel
   - improve code style
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] pocozh commented on a change in pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

Posted by GitBox <gi...@apache.org>.
pocozh commented on a change in pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403#discussion_r837010262



##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -572,29 +577,40 @@ public int compare(Map.Entry<HostInfo, Integer> o1, Map.Entry<HostInfo, Integer>
     }
 
     private void sendHeartBeat() {
-        for (HostInfo hostInfo : clientMap.keySet()) {
-            if (hostInfo == null) {
-                continue;
+        // all hbChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapHB.entrySet()) {
+            if (clientEntry.getKey() != null && clientEntry.getValue() != null) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
             }
-            NettyClient client = clientMap.get(hostInfo);
-            if (client == null) {
+        }
+
+        // only idle dataChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapData.entrySet()) {
+            if (clientEntry.getKey() == null || clientEntry.getValue() == null) {
                 continue;
             }
-            try {
-                if (client.isActive()) {
-                    //logger.info("active host to send heartbeat! {}", entry.getKey().getHostName());
-                    EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
-                            8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
-                    if (configure.isNeedAuthentication()) {
-                        encodeObject.setAuth(configure.isNeedAuthentication(),
-                                configure.getUserName(), configure.getSecretKey());
-                    }
-                    client.write(encodeObject);
+            if (sender.isIdleClient(clientEntry.getValue())) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
+            }
+        }
+
+    }
+
+    private void sendHeartBeat(HostInfo hostInfo, NettyClient client) {
+        try {
+            if (client.isActive()) {
+                logger.debug("active host to send heartbeat! {}", hostInfo.getReferenceName());
+                EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
+                        8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
+                if (configure.isNeedAuthentication()) {

Review comment:
       > Heartbeat data miss HostInfo data.
   
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] pocozh commented on a change in pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

Posted by GitBox <gi...@apache.org>.
pocozh commented on a change in pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403#discussion_r837006865



##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
##########
@@ -598,21 +606,21 @@ private ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Excep
             if (jsonItem != null) {
                 if (!jsonItem.has("port")) {
                     throw new Exception("Parse local proxyList failure: "
-                        + "port field is not exist in address(" + i + ")!");
+                            + "port field is not exist in address(" + i + ")!");

Review comment:
       Using tencent-code-style template, long line break is 8 space




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] pocozh commented on a change in pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

Posted by GitBox <gi...@apache.org>.
pocozh commented on a change in pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403#discussion_r837010082



##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -572,29 +577,40 @@ public int compare(Map.Entry<HostInfo, Integer> o1, Map.Entry<HostInfo, Integer>
     }
 
     private void sendHeartBeat() {
-        for (HostInfo hostInfo : clientMap.keySet()) {
-            if (hostInfo == null) {
-                continue;
+        // all hbChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapHB.entrySet()) {
+            if (clientEntry.getKey() != null && clientEntry.getValue() != null) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
             }
-            NettyClient client = clientMap.get(hostInfo);
-            if (client == null) {
+        }
+
+        // only idle dataChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapData.entrySet()) {
+            if (clientEntry.getKey() == null || clientEntry.getValue() == null) {
                 continue;
             }
-            try {
-                if (client.isActive()) {
-                    //logger.info("active host to send heartbeat! {}", entry.getKey().getHostName());
-                    EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
-                            8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
-                    if (configure.isNeedAuthentication()) {
-                        encodeObject.setAuth(configure.isNeedAuthentication(),
-                                configure.getUserName(), configure.getSecretKey());
-                    }
-                    client.write(encodeObject);
+            if (sender.isIdleClient(clientEntry.getValue())) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
+            }
+        }
+
+    }
+
+    private void sendHeartBeat(HostInfo hostInfo, NettyClient client) {
+        try {
+            if (client.isActive()) {

Review comment:
       done

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -572,29 +577,40 @@ public int compare(Map.Entry<HostInfo, Integer> o1, Map.Entry<HostInfo, Integer>
     }
 
     private void sendHeartBeat() {
-        for (HostInfo hostInfo : clientMap.keySet()) {
-            if (hostInfo == null) {
-                continue;
+        // all hbChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapHB.entrySet()) {
+            if (clientEntry.getKey() != null && clientEntry.getValue() != null) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
             }
-            NettyClient client = clientMap.get(hostInfo);
-            if (client == null) {
+        }
+
+        // only idle dataChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapData.entrySet()) {
+            if (clientEntry.getKey() == null || clientEntry.getValue() == null) {
                 continue;
             }
-            try {
-                if (client.isActive()) {
-                    //logger.info("active host to send heartbeat! {}", entry.getKey().getHostName());
-                    EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
-                            8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
-                    if (configure.isNeedAuthentication()) {
-                        encodeObject.setAuth(configure.isNeedAuthentication(),
-                                configure.getUserName(), configure.getSecretKey());
-                    }
-                    client.write(encodeObject);
+            if (sender.isIdleClient(clientEntry.getValue())) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
+            }
+        }
+
+    }
+
+    private void sendHeartBeat(HostInfo hostInfo, NettyClient client) {
+        try {
+            if (client.isActive()) {

Review comment:
       > It is better to add a log when client is not active.
   
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang merged pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

Posted by GitBox <gi...@apache.org>.
dockerzhang merged pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403#discussion_r836987075



##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
##########
@@ -598,21 +606,21 @@ private ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Excep
             if (jsonItem != null) {
                 if (!jsonItem.has("port")) {
                     throw new Exception("Parse local proxyList failure: "
-                        + "port field is not exist in address(" + i + ")!");
+                            + "port field is not exist in address(" + i + ")!");

Review comment:
       Why is indentation 8 space?

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -650,6 +666,25 @@ private void fillUpWorkClientWithLastBadClient() {
         }
     }
 
+    private void fillUpWorkClientWithBadClient(List<HostInfo> badHostLists) {
+        if (badHostLists.isEmpty()) {

Review comment:
       It is good thing that badHostLists is empty, why use warn level log?

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -572,29 +577,40 @@ public int compare(Map.Entry<HostInfo, Integer> o1, Map.Entry<HostInfo, Integer>
     }
 
     private void sendHeartBeat() {
-        for (HostInfo hostInfo : clientMap.keySet()) {
-            if (hostInfo == null) {
-                continue;
+        // all hbChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapHB.entrySet()) {
+            if (clientEntry.getKey() != null && clientEntry.getValue() != null) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
             }
-            NettyClient client = clientMap.get(hostInfo);
-            if (client == null) {
+        }
+
+        // only idle dataChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapData.entrySet()) {
+            if (clientEntry.getKey() == null || clientEntry.getValue() == null) {
                 continue;
             }
-            try {
-                if (client.isActive()) {
-                    //logger.info("active host to send heartbeat! {}", entry.getKey().getHostName());
-                    EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
-                            8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
-                    if (configure.isNeedAuthentication()) {
-                        encodeObject.setAuth(configure.isNeedAuthentication(),
-                                configure.getUserName(), configure.getSecretKey());
-                    }
-                    client.write(encodeObject);
+            if (sender.isIdleClient(clientEntry.getValue())) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
+            }
+        }
+
+    }
+
+    private void sendHeartBeat(HostInfo hostInfo, NettyClient client) {
+        try {
+            if (client.isActive()) {
+                logger.debug("active host to send heartbeat! {}", hostInfo.getReferenceName());
+                EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
+                        8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
+                if (configure.isNeedAuthentication()) {

Review comment:
       Heartbeat data miss HostInfo data.

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -572,29 +577,40 @@ public int compare(Map.Entry<HostInfo, Integer> o1, Map.Entry<HostInfo, Integer>
     }
 
     private void sendHeartBeat() {
-        for (HostInfo hostInfo : clientMap.keySet()) {
-            if (hostInfo == null) {
-                continue;
+        // all hbChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapHB.entrySet()) {
+            if (clientEntry.getKey() != null && clientEntry.getValue() != null) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
             }
-            NettyClient client = clientMap.get(hostInfo);
-            if (client == null) {
+        }
+
+        // only idle dataChannels need hb
+        for (Map.Entry<HostInfo, NettyClient> clientEntry : clientMapData.entrySet()) {
+            if (clientEntry.getKey() == null || clientEntry.getValue() == null) {
                 continue;
             }
-            try {
-                if (client.isActive()) {
-                    //logger.info("active host to send heartbeat! {}", entry.getKey().getHostName());
-                    EncodeObject encodeObject = new EncodeObject("heartbeat".getBytes(StandardCharsets.UTF_8),
-                            8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", "");
-                    if (configure.isNeedAuthentication()) {
-                        encodeObject.setAuth(configure.isNeedAuthentication(),
-                                configure.getUserName(), configure.getSecretKey());
-                    }
-                    client.write(encodeObject);
+            if (sender.isIdleClient(clientEntry.getValue())) {
+                sendHeartBeat(clientEntry.getKey(), clientEntry.getValue());
+            }
+        }
+
+    }
+
+    private void sendHeartBeat(HostInfo hostInfo, NettyClient client) {
+        try {
+            if (client.isActive()) {

Review comment:
       It is better to add a log when client is not active.

##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
##########
@@ -869,7 +876,7 @@ private StringEntity getEntity(List<BasicNameValuePair> params)
     }
 
     private CloseableHttpClient getCloseableHttpClient(List<BasicNameValuePair> params)
-        throws NoSuchAlgorithmException, KeyManagementException {
+            throws NoSuchAlgorithmException, KeyManagementException {

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] pocozh commented on a change in pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

Posted by GitBox <gi...@apache.org>.
pocozh commented on a change in pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403#discussion_r837007071



##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
##########
@@ -598,21 +606,21 @@ private ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Excep
             if (jsonItem != null) {
                 if (!jsonItem.has("port")) {
                     throw new Exception("Parse local proxyList failure: "
-                        + "port field is not exist in address(" + i + ")!");
+                            + "port field is not exist in address(" + i + ")!");

Review comment:
       > Why is indentation 8 space?
   
   Using tencent-code-style template, long line break is 8 space




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] pocozh commented on a change in pull request #3403: [INLONG-3387][SDK] Strengthen the strategy of heartbeat and channel update for dataproxy

Posted by GitBox <gi...@apache.org>.
pocozh commented on a change in pull request #3403:
URL: https://github.com/apache/incubator-inlong/pull/3403#discussion_r837008463



##########
File path: inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
##########
@@ -650,6 +666,25 @@ private void fillUpWorkClientWithLastBadClient() {
         }
     }
 
+    private void fillUpWorkClientWithBadClient(List<HostInfo> badHostLists) {
+        if (badHostLists.isEmpty()) {

Review comment:
       > It is good thing that badHostLists is empty, why use warn level log?
   
   Invoking this method means that all hosts are bad, so badHostLists should not be empty




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org