You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/14 07:49:04 UTC

[GitHub] [inlong] GanfengTan opened a new pull request, #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

GanfengTan opened a new pull request, #6184:
URL: https://github.com/apache/inlong/pull/6184

   Support node protocol reporting and query.
   
   1.DataProxy supports http and tcp protocol reporting.
   2.Agent supports to query dp node by protocol.
   3.Manager client supports to query dp node by protocol and groupId.
   
   - Fixes #6181 
   
   ### Motivation
   
   1.This PR supports node of dp protocol, for example: tcp, http.
   2.Agent queries node of dp by dp SDK.
   3.Maneger client  queries node of cluster by protocol 
   
   ### Modifications
   
   Agent send manager.
   Dataproxy Multi-protocol reporting.
   Update SDK param of dp-config.
   
   ### Verifying this change
   
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995533603


##########
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java:
##########
@@ -49,46 +51,58 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
 
     @Test
     void testReportHeartbeat() throws InterruptedException {
-        HeartbeatMsg msg = createHeartbeatMsg();
-        heartbeatManager.reportHeartbeat(msg);
+        List<HeartbeatMsg> heartbeatMsgs = createHeartbeatMsg();
+        heartbeatManager.reportHeartbeat(heartbeatMsgs);
         InlongClusterEntity entity = clusterMapper.selectById(1);
         log.info(JsonUtils.toJsonString(entity));
-        List<InlongClusterEntity> clusterEntities = clusterMapper.selectByKey(null, msg.getClusterName(),
-                msg.getComponentType());
-        Assertions.assertEquals(1, clusterEntities.size());
-        Assertions.assertEquals(clusterEntities.get(0).getName(), msg.getClusterName());
+        for (HeartbeatMsg msg : heartbeatMsgs) {
+            if (null == msg.getProtocolType() || StringUtils.isBlank(msg.getProtocolType())) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997797835


##########
inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java:
##########
@@ -33,7 +33,7 @@ public class ComponentHeartbeat {
 
     private String ip;
 
-    private int port;
+    private String port;

Review Comment:
   Why change the port to String type, is the Integer enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997805812


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterPageRequest.java:
##########
@@ -60,6 +60,9 @@ public class ClusterPageRequest extends PageRequest {
     @ApiModelProperty(value = "Extend tag")
     private String extTag;
 
+    @ApiModelProperty(value = "Protocol type for DATAPROXY, for example: http,tcp")

Review Comment:
   ```suggestion
       @ApiModelProperty(value = "Protocol type, such as: TCP, HTTP")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997804725


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,36 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
 
         // if the heartbeat was not in the cache, insert or update the node by the heartbeat info
         HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
-        boolean exist = true;
-        int updateNum = UPDATE_ZERO_ROW;
-        if (lastHeartbeat == null) {
-            exist = false;
-            InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
-            if (clusterNode == null) {
-                updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
-                log.info("insert node result: {}", updateNum);
-            } else {
-                updateNum = updateClusterNode(clusterNode);
-                log.info("update node result: {}", updateNum);
+        String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+        String protocolType = heartbeat.getProtocolType();
+        String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+                ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+        int handlerNum = 0;
+        for (int i = 0; i < ports.length; i++) {
+            HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(

Review Comment:
   Suggest using the `JsonUtils` instead of a new `ObjectMapper`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997805812


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterPageRequest.java:
##########
@@ -60,6 +60,9 @@ public class ClusterPageRequest extends PageRequest {
     @ApiModelProperty(value = "Extend tag")
     private String extTag;
 
+    @ApiModelProperty(value = "Protocol type for DATAPROXY, for example: http,tcp")

Review Comment:
   ```suggestion
       @ApiModelProperty(value = "Protocol type, such as: TCP, HTTP")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996660504


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java:
##########
@@ -69,27 +71,36 @@ public class HeartbeatServiceImpl implements HeartbeatService {
     private StreamHeartbeatEntityMapper streamHeartbeatMapper;
 
     @Override
-    public Boolean reportHeartbeat(HeartbeatReportRequest request) {
-        if (request == null || StringUtils.isBlank(request.getComponentType())) {
-            log.warn("request is null or component null, just return");
+    public Boolean reportHeartbeat(List<HeartbeatReportRequest> requests) {
+        if (requests == null || requests.isEmpty()) {
             return false;
         }
-        if (log.isDebugEnabled()) {
-            log.debug("received heartbeat: " + request);
-        }
-        heartbeatManager.reportHeartbeat(request);
-        ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
-        switch (componentType) {
-            case Sort:
-            case DataProxy:
-            case Agent:
-            case Cache:
-            case SDK:
-                return updateHeartbeatOpt(request);
-            default:
-                log.error("Unsupported componentType={} for Inlong", componentType);
-                return false;
+        List<HeartbeatMsg> heartbeatMsgs = new ArrayList<>();
+        for (HeartbeatReportRequest request : requests) {
+            if (request == null || StringUtils.isBlank(request.getComponentType())) {
+                log.warn("request is null or component null, just return");
+                continue;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("received heartbeat: " + request);
+            }
+            heartbeatMsgs.add(request);
+            ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                case SDK:

Review Comment:
   why `updateHeartbeatOpt` only support SDK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996662247


##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,16 @@ manager.hosts=127.0.0.1:8083
 manager.auth.secretId=
 manager.auth.secretKey=
 proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+#support HTTP or TCP, default TCP. for example: proxy.report.protocol.type=TCP, proxy.report.protocol.type=HTTP, proxy.report.protocol.type=TCP,HTTP
+proxy.report.protocol.type=TCP

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on PR #6184:
URL: https://github.com/apache/inlong/pull/6184#issuecomment-1278729847

   > For configuration, I don't known if any user only use http but no tcp to report? If so, docker script should add some operations.
   3qs Suggested.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996999607


##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -21,6 +21,9 @@ local_ip=$(ifconfig | grep inet | grep -v inet6 | grep -v "127.0.0.1" | awk '{pr
 # config
 cd "${file_path}/"
 common_conf_file=./conf/common.properties
+pulsar_conf_file=./conf/dataproxy-pulsar.conf
+tube_conf_file=./conf/dataproxy-tube.conf

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][DataProxy][Manager] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997907438


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,36 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
 
         // if the heartbeat was not in the cache, insert or update the node by the heartbeat info
         HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
-        boolean exist = true;
-        int updateNum = UPDATE_ZERO_ROW;
-        if (lastHeartbeat == null) {
-            exist = false;
-            InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
-            if (clusterNode == null) {
-                updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
-                log.info("insert node result: {}", updateNum);
-            } else {
-                updateNum = updateClusterNode(clusterNode);
-                log.info("update node result: {}", updateNum);
+        String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+        String protocolType = heartbeat.getProtocolType();
+        String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+                ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+        int handlerNum = 0;
+        for (int i = 0; i < ports.length; i++) {
+            HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(

Review Comment:
   https://github.com/apache/inlong/pull/6184/commits/9c88ea1bba0c93c3eba399f38c58bdf8b6330ebf



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996659288


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java:
##########
@@ -79,7 +84,7 @@ public void start() {
     }
 
     @Override
-    public void reportHeartbeat(HeartbeatMsg heartbeat) {
+    public void reportHeartbeat(List<HeartbeatMsg> heartbeat) {

Review Comment:
   Why report multiple heartbeats?
   
   A DataProxy node can start multiple different sources but does not need to construct a heartbeat message for each source



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997000426


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java:
##########
@@ -69,27 +71,36 @@ public class HeartbeatServiceImpl implements HeartbeatService {
     private StreamHeartbeatEntityMapper streamHeartbeatMapper;
 
     @Override
-    public Boolean reportHeartbeat(HeartbeatReportRequest request) {
-        if (request == null || StringUtils.isBlank(request.getComponentType())) {
-            log.warn("request is null or component null, just return");
+    public Boolean reportHeartbeat(List<HeartbeatReportRequest> requests) {
+        if (requests == null || requests.isEmpty()) {
             return false;
         }
-        if (log.isDebugEnabled()) {
-            log.debug("received heartbeat: " + request);
-        }
-        heartbeatManager.reportHeartbeat(request);
-        ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
-        switch (componentType) {
-            case Sort:
-            case DataProxy:
-            case Agent:
-            case Cache:
-            case SDK:
-                return updateHeartbeatOpt(request);
-            default:
-                log.error("Unsupported componentType={} for Inlong", componentType);
-                return false;
+        List<HeartbeatMsg> heartbeatMsgs = new ArrayList<>();
+        for (HeartbeatReportRequest request : requests) {
+            if (request == null || StringUtils.isBlank(request.getComponentType())) {
+                log.warn("request is null or component null, just return");
+                continue;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("received heartbeat: " + request);
+            }
+            heartbeatMsgs.add(request);
+            ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                case SDK:

Review Comment:
   Has been rolled back to previous logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995544318


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java:
##########
@@ -62,6 +62,7 @@ public class SenderManager {
     private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
     private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
     private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+    private static final String PROTOCOL_TYPE_TCP = "tcp";

Review Comment:
   has moved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995461153


##########
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java:
##########
@@ -49,46 +51,58 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
 
     @Test
     void testReportHeartbeat() throws InterruptedException {
-        HeartbeatMsg msg = createHeartbeatMsg();
-        heartbeatManager.reportHeartbeat(msg);
+        List<HeartbeatMsg> heartbeatMsgs = createHeartbeatMsg();
+        heartbeatManager.reportHeartbeat(heartbeatMsgs);
         InlongClusterEntity entity = clusterMapper.selectById(1);
         log.info(JsonUtils.toJsonString(entity));
-        List<InlongClusterEntity> clusterEntities = clusterMapper.selectByKey(null, msg.getClusterName(),
-                msg.getComponentType());
-        Assertions.assertEquals(1, clusterEntities.size());
-        Assertions.assertEquals(clusterEntities.get(0).getName(), msg.getClusterName());
+        for (HeartbeatMsg msg : heartbeatMsgs) {
+            if (null == msg.getProtocolType() || StringUtils.isBlank(msg.getProtocolType())) {

Review Comment:
   ```suggestion
               if (StringUtils.isBlank(msg.getProtocolType())) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995544672


##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,15 @@ manager.hosts=127.0.0.1:8083
 manager.auth.secretId=
 manager.auth.secretKey=
 proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+proxy.report.tcp.protocol=tcp
 # proxy cluster name
 proxy.cluster.name=default_dataproxy
 proxy.cluster.tag=default_cluster
 proxy.cluster.inCharges=admin
 # check interval of local config (millisecond)
 configCheckInterval=10000
-

Review Comment:
   has moved another PR 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997751319


##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -29,6 +32,17 @@ sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_co
 sed -i "s/proxy.cluster.name=.*$/proxy.cluster.name=${CLUSTER_NAME}/g" "${common_conf_file}"
 sed -i "s/proxy.cluster.inCharges=.*$/proxy.cluster.inCharges=${CLUSTER_IN_CHARGES}/g" "${common_conf_file}"
 
+if [ "${PROTOCOL_HTTP}" = "http" ]; then
+  echo "proxy.report.http.protocol=${PROTOCOL_HTTP}" >> "${common_conf_file}"
+  if [ "${MQ_TYPE}" = "tubemq" ]; then
+  sed -i "s/agent1.sources=.*$/agent1.sources=tcp-source http-source/g" "${tube_conf_file}"
+  sed -n '29~49p' ${http_conf_file} >> ${tube_conf_file}
+ else 

Review Comment:
   Has been removed.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,35 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
 
         // if the heartbeat was not in the cache, insert or update the node by the heartbeat info
         HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
-        boolean exist = true;
-        int updateNum = UPDATE_ZERO_ROW;
-        if (lastHeartbeat == null) {
-            exist = false;
-            InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
-            if (clusterNode == null) {
-                updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
-                log.info("insert node result: {}", updateNum);
-            } else {
-                updateNum = updateClusterNode(clusterNode);
-                log.info("update node result: {}", updateNum);
+        String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+        String protocolType = heartbeat.getProtocolType();
+        String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+                ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+        for (int i = 0; i < ports.length; i++) {
+            HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(
+                    OBJECT_MAPPER.writeValueAsBytes(heartbeat), HeartbeatMsg.class);
+            heartbeatMsg.setPort(ports[i].trim());
+            heartbeatMsg.setProtocolType(protocolType);
+            if (protocolTypes != null) {
+                heartbeatMsg.setProtocolType(protocolTypes[i]);
+            }
+            if (lastHeartbeat == null) {
+                InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeatMsg);
+                if (clusterNode == null) {
+                    insertClusterNode(clusterInfo, heartbeatMsg, clusterInfo.getCreator());
+                } else {
+                    updateClusterNode(clusterNode);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #6184: [INLONG-6181][Agent][DataProxy][Manager] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow merged PR #6184:
URL: https://github.com/apache/inlong/pull/6184


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995466764


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java:
##########
@@ -62,6 +62,7 @@ public class SenderManager {
     private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
     private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
     private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+    private static final String PROTOCOL_TYPE_TCP = "tcp";

Review Comment:
   Suggest move the `org.apache.inlong.manager.common.consts.ProtocolType` to inlong-common(`org.apache.inlong.common.constant`), and use it in Agent, DataProxy, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] lucaspeng12138 commented on pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
lucaspeng12138 commented on PR #6184:
URL: https://github.com/apache/inlong/pull/6184#issuecomment-1278639017

   For configuration, I don't known if any user only use http but no tcp to report? If so, docker script should add some operations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] lucaspeng12138 commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
lucaspeng12138 commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997697217


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,35 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
 
         // if the heartbeat was not in the cache, insert or update the node by the heartbeat info
         HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
-        boolean exist = true;
-        int updateNum = UPDATE_ZERO_ROW;
-        if (lastHeartbeat == null) {
-            exist = false;
-            InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
-            if (clusterNode == null) {
-                updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
-                log.info("insert node result: {}", updateNum);
-            } else {
-                updateNum = updateClusterNode(clusterNode);
-                log.info("update node result: {}", updateNum);
+        String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+        String protocolType = heartbeat.getProtocolType();
+        String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+                ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+        for (int i = 0; i < ports.length; i++) {
+            HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(
+                    OBJECT_MAPPER.writeValueAsBytes(heartbeat), HeartbeatMsg.class);
+            heartbeatMsg.setPort(ports[i].trim());
+            heartbeatMsg.setProtocolType(protocolType);
+            if (protocolTypes != null) {
+                heartbeatMsg.setProtocolType(protocolTypes[i]);
+            }
+            if (lastHeartbeat == null) {
+                InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeatMsg);
+                if (clusterNode == null) {
+                    insertClusterNode(clusterInfo, heartbeatMsg, clusterInfo.getCreator());
+                } else {
+                    updateClusterNode(clusterNode);

Review Comment:
   May cause this bug happens, https://github.com/apache/inlong/issues/6159. Please add judgment for MySql IO return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] lucaspeng12138 commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
lucaspeng12138 commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995466315


##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -29,6 +32,17 @@ sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_co
 sed -i "s/proxy.cluster.name=.*$/proxy.cluster.name=${CLUSTER_NAME}/g" "${common_conf_file}"
 sed -i "s/proxy.cluster.inCharges=.*$/proxy.cluster.inCharges=${CLUSTER_IN_CHARGES}/g" "${common_conf_file}"
 
+if [ "${PROTOCOL_HTTP}" = "http" ]; then
+  echo "proxy.report.http.protocol=${PROTOCOL_HTTP}" >> "${common_conf_file}"
+  if [ "${MQ_TYPE}" = "tubemq" ]; then
+  sed -i "s/agent1.sources=.*$/agent1.sources=tcp-source http-source/g" "${tube_conf_file}"
+  sed -n '29~49p' ${http_conf_file} >> ${tube_conf_file}
+ else 

Review Comment:
   Seems code format not aligned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996652178


##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,16 @@ manager.hosts=127.0.0.1:8083
 manager.auth.secretId=
 manager.auth.secretKey=
 proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+#support HTTP or TCP, default TCP. for example: proxy.report.protocol.type=TCP, proxy.report.protocol.type=HTTP, proxy.report.protocol.type=TCP,HTTP
+proxy.report.protocol.type=TCP

Review Comment:
   ->
   \# support HTTP/TCP/TCP,HTTP
   proxy.report.protocol.type=TCP
   proxy.report.tcp.port=46801
   proxy.report.http.port=46802



##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -21,6 +21,9 @@ local_ip=$(ifconfig | grep inet | grep -v inet6 | grep -v "127.0.0.1" | awk '{pr
 # config
 cd "${file_path}/"
 common_conf_file=./conf/common.properties
+pulsar_conf_file=./conf/dataproxy-pulsar.conf
+tube_conf_file=./conf/dataproxy-tube.conf

Review Comment:
   here we can add CONFIG_FILE="dataproxy-${MQ_TYPE}.conf" to define different configuration file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995467960


##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,15 @@ manager.hosts=127.0.0.1:8083
 manager.auth.secretId=
 manager.auth.secretKey=
 proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+proxy.report.tcp.protocol=tcp
 # proxy cluster name
 proxy.cluster.name=default_dataproxy
 proxy.cluster.tag=default_cluster
 proxy.cluster.inCharges=admin
 # check interval of local config (millisecond)
 configCheckInterval=10000
-

Review Comment:
   Please keep this blank line, it is more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997001112


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java:
##########
@@ -79,7 +84,7 @@ public void start() {
     }
 
     @Override
-    public void reportHeartbeat(HeartbeatMsg heartbeat) {
+    public void reportHeartbeat(List<HeartbeatMsg> heartbeat) {

Review Comment:
   Has been removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997805100


##########
inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java:
##########
@@ -33,7 +33,7 @@ public class ComponentHeartbeat {
 
     private String ip;
 
-    private int port;
+    private String port;

Review Comment:
   DP supports multiple ports.
   ![企业微信截图_33dc10fa-fc6b-4ea9-b4ed-040f7d099eac](https://user-images.githubusercontent.com/17596132/196360973-d0f4f58e-d696-4ad4-8f46-4eb768ebc204.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org