You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/14 07:49:04 UTC
[GitHub] [inlong] GanfengTan opened a new pull request, #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
GanfengTan opened a new pull request, #6184:
URL: https://github.com/apache/inlong/pull/6184
Support node protocol reporting and query.
1.DataProxy supports http and tcp protocol reporting.
2.Agent supports to query dp node by protocol.
3.Manager client supports to query dp node by protocol and groupId.
- Fixes #6181
### Motivation
1.This PR supports node of dp protocol, for example: tcp, http.
2.Agent queries node of dp by dp SDK.
3.Maneger client queries node of cluster by protocol
### Modifications
Agent send manager.
Dataproxy Multi-protocol reporting.
Update SDK param of dp-config.
### Verifying this change
- [x] This change is a trivial rework/code cleanup without any test coverage.
- [ ] This change is already covered by existing tests, such as:
*(please describe tests)*
- [ ] This change added tests and can be verified as follows:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995533603
##########
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java:
##########
@@ -49,46 +51,58 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
@Test
void testReportHeartbeat() throws InterruptedException {
- HeartbeatMsg msg = createHeartbeatMsg();
- heartbeatManager.reportHeartbeat(msg);
+ List<HeartbeatMsg> heartbeatMsgs = createHeartbeatMsg();
+ heartbeatManager.reportHeartbeat(heartbeatMsgs);
InlongClusterEntity entity = clusterMapper.selectById(1);
log.info(JsonUtils.toJsonString(entity));
- List<InlongClusterEntity> clusterEntities = clusterMapper.selectByKey(null, msg.getClusterName(),
- msg.getComponentType());
- Assertions.assertEquals(1, clusterEntities.size());
- Assertions.assertEquals(clusterEntities.get(0).getName(), msg.getClusterName());
+ for (HeartbeatMsg msg : heartbeatMsgs) {
+ if (null == msg.getProtocolType() || StringUtils.isBlank(msg.getProtocolType())) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997797835
##########
inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java:
##########
@@ -33,7 +33,7 @@ public class ComponentHeartbeat {
private String ip;
- private int port;
+ private String port;
Review Comment:
Why change the port to String type, is the Integer enough?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997805812
##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterPageRequest.java:
##########
@@ -60,6 +60,9 @@ public class ClusterPageRequest extends PageRequest {
@ApiModelProperty(value = "Extend tag")
private String extTag;
+ @ApiModelProperty(value = "Protocol type for DATAPROXY, for example: http,tcp")
Review Comment:
```suggestion
@ApiModelProperty(value = "Protocol type, such as: TCP, HTTP")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997804725
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,36 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
// if the heartbeat was not in the cache, insert or update the node by the heartbeat info
HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
- boolean exist = true;
- int updateNum = UPDATE_ZERO_ROW;
- if (lastHeartbeat == null) {
- exist = false;
- InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
- if (clusterNode == null) {
- updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
- log.info("insert node result: {}", updateNum);
- } else {
- updateNum = updateClusterNode(clusterNode);
- log.info("update node result: {}", updateNum);
+ String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+ String protocolType = heartbeat.getProtocolType();
+ String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+ ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+ int handlerNum = 0;
+ for (int i = 0; i < ports.length; i++) {
+ HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(
Review Comment:
Suggest using the `JsonUtils` instead of a new `ObjectMapper`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997805812
##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterPageRequest.java:
##########
@@ -60,6 +60,9 @@ public class ClusterPageRequest extends PageRequest {
@ApiModelProperty(value = "Extend tag")
private String extTag;
+ @ApiModelProperty(value = "Protocol type for DATAPROXY, for example: http,tcp")
Review Comment:
```suggestion
@ApiModelProperty(value = "Protocol type, such as: TCP, HTTP")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] dockerzhang commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
dockerzhang commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996660504
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java:
##########
@@ -69,27 +71,36 @@ public class HeartbeatServiceImpl implements HeartbeatService {
private StreamHeartbeatEntityMapper streamHeartbeatMapper;
@Override
- public Boolean reportHeartbeat(HeartbeatReportRequest request) {
- if (request == null || StringUtils.isBlank(request.getComponentType())) {
- log.warn("request is null or component null, just return");
+ public Boolean reportHeartbeat(List<HeartbeatReportRequest> requests) {
+ if (requests == null || requests.isEmpty()) {
return false;
}
- if (log.isDebugEnabled()) {
- log.debug("received heartbeat: " + request);
- }
- heartbeatManager.reportHeartbeat(request);
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- case SDK:
- return updateHeartbeatOpt(request);
- default:
- log.error("Unsupported componentType={} for Inlong", componentType);
- return false;
+ List<HeartbeatMsg> heartbeatMsgs = new ArrayList<>();
+ for (HeartbeatReportRequest request : requests) {
+ if (request == null || StringUtils.isBlank(request.getComponentType())) {
+ log.warn("request is null or component null, just return");
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("received heartbeat: " + request);
+ }
+ heartbeatMsgs.add(request);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ case SDK:
Review Comment:
why `updateHeartbeatOpt` only support SDK?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996662247
##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,16 @@ manager.hosts=127.0.0.1:8083
manager.auth.secretId=
manager.auth.secretKey=
proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+#support HTTP or TCP, default TCP. for example: proxy.report.protocol.type=TCP, proxy.report.protocol.type=HTTP, proxy.report.protocol.type=TCP,HTTP
+proxy.report.protocol.type=TCP
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on PR #6184:
URL: https://github.com/apache/inlong/pull/6184#issuecomment-1278729847
> For configuration, I don't known if any user only use http but no tcp to report? If so, docker script should add some operations.
3qs Suggested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996999607
##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -21,6 +21,9 @@ local_ip=$(ifconfig | grep inet | grep -v inet6 | grep -v "127.0.0.1" | awk '{pr
# config
cd "${file_path}/"
common_conf_file=./conf/common.properties
+pulsar_conf_file=./conf/dataproxy-pulsar.conf
+tube_conf_file=./conf/dataproxy-tube.conf
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][DataProxy][Manager] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997907438
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,36 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
// if the heartbeat was not in the cache, insert or update the node by the heartbeat info
HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
- boolean exist = true;
- int updateNum = UPDATE_ZERO_ROW;
- if (lastHeartbeat == null) {
- exist = false;
- InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
- if (clusterNode == null) {
- updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
- log.info("insert node result: {}", updateNum);
- } else {
- updateNum = updateClusterNode(clusterNode);
- log.info("update node result: {}", updateNum);
+ String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+ String protocolType = heartbeat.getProtocolType();
+ String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+ ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+ int handlerNum = 0;
+ for (int i = 0; i < ports.length; i++) {
+ HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(
Review Comment:
https://github.com/apache/inlong/pull/6184/commits/9c88ea1bba0c93c3eba399f38c58bdf8b6330ebf
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996659288
##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java:
##########
@@ -79,7 +84,7 @@ public void start() {
}
@Override
- public void reportHeartbeat(HeartbeatMsg heartbeat) {
+ public void reportHeartbeat(List<HeartbeatMsg> heartbeat) {
Review Comment:
Why report multiple heartbeats?
A DataProxy node can start multiple different sources but does not need to construct a heartbeat message for each source
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997000426
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java:
##########
@@ -69,27 +71,36 @@ public class HeartbeatServiceImpl implements HeartbeatService {
private StreamHeartbeatEntityMapper streamHeartbeatMapper;
@Override
- public Boolean reportHeartbeat(HeartbeatReportRequest request) {
- if (request == null || StringUtils.isBlank(request.getComponentType())) {
- log.warn("request is null or component null, just return");
+ public Boolean reportHeartbeat(List<HeartbeatReportRequest> requests) {
+ if (requests == null || requests.isEmpty()) {
return false;
}
- if (log.isDebugEnabled()) {
- log.debug("received heartbeat: " + request);
- }
- heartbeatManager.reportHeartbeat(request);
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- case SDK:
- return updateHeartbeatOpt(request);
- default:
- log.error("Unsupported componentType={} for Inlong", componentType);
- return false;
+ List<HeartbeatMsg> heartbeatMsgs = new ArrayList<>();
+ for (HeartbeatReportRequest request : requests) {
+ if (request == null || StringUtils.isBlank(request.getComponentType())) {
+ log.warn("request is null or component null, just return");
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("received heartbeat: " + request);
+ }
+ heartbeatMsgs.add(request);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ case SDK:
Review Comment:
Has been rolled back to previous logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995544318
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java:
##########
@@ -62,6 +62,7 @@ public class SenderManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+ private static final String PROTOCOL_TYPE_TCP = "tcp";
Review Comment:
has moved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995461153
##########
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java:
##########
@@ -49,46 +51,58 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
@Test
void testReportHeartbeat() throws InterruptedException {
- HeartbeatMsg msg = createHeartbeatMsg();
- heartbeatManager.reportHeartbeat(msg);
+ List<HeartbeatMsg> heartbeatMsgs = createHeartbeatMsg();
+ heartbeatManager.reportHeartbeat(heartbeatMsgs);
InlongClusterEntity entity = clusterMapper.selectById(1);
log.info(JsonUtils.toJsonString(entity));
- List<InlongClusterEntity> clusterEntities = clusterMapper.selectByKey(null, msg.getClusterName(),
- msg.getComponentType());
- Assertions.assertEquals(1, clusterEntities.size());
- Assertions.assertEquals(clusterEntities.get(0).getName(), msg.getClusterName());
+ for (HeartbeatMsg msg : heartbeatMsgs) {
+ if (null == msg.getProtocolType() || StringUtils.isBlank(msg.getProtocolType())) {
Review Comment:
```suggestion
if (StringUtils.isBlank(msg.getProtocolType())) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995544672
##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,15 @@ manager.hosts=127.0.0.1:8083
manager.auth.secretId=
manager.auth.secretKey=
proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+proxy.report.tcp.protocol=tcp
# proxy cluster name
proxy.cluster.name=default_dataproxy
proxy.cluster.tag=default_cluster
proxy.cluster.inCharges=admin
# check interval of local config (millisecond)
configCheckInterval=10000
-
Review Comment:
has moved another PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997751319
##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -29,6 +32,17 @@ sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_co
sed -i "s/proxy.cluster.name=.*$/proxy.cluster.name=${CLUSTER_NAME}/g" "${common_conf_file}"
sed -i "s/proxy.cluster.inCharges=.*$/proxy.cluster.inCharges=${CLUSTER_IN_CHARGES}/g" "${common_conf_file}"
+if [ "${PROTOCOL_HTTP}" = "http" ]; then
+ echo "proxy.report.http.protocol=${PROTOCOL_HTTP}" >> "${common_conf_file}"
+ if [ "${MQ_TYPE}" = "tubemq" ]; then
+ sed -i "s/agent1.sources=.*$/agent1.sources=tcp-source http-source/g" "${tube_conf_file}"
+ sed -n '29~49p' ${http_conf_file} >> ${tube_conf_file}
+ else
Review Comment:
Has been removed.
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,35 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
// if the heartbeat was not in the cache, insert or update the node by the heartbeat info
HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
- boolean exist = true;
- int updateNum = UPDATE_ZERO_ROW;
- if (lastHeartbeat == null) {
- exist = false;
- InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
- if (clusterNode == null) {
- updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
- log.info("insert node result: {}", updateNum);
- } else {
- updateNum = updateClusterNode(clusterNode);
- log.info("update node result: {}", updateNum);
+ String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+ String protocolType = heartbeat.getProtocolType();
+ String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+ ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+ for (int i = 0; i < ports.length; i++) {
+ HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsBytes(heartbeat), HeartbeatMsg.class);
+ heartbeatMsg.setPort(ports[i].trim());
+ heartbeatMsg.setProtocolType(protocolType);
+ if (protocolTypes != null) {
+ heartbeatMsg.setProtocolType(protocolTypes[i]);
+ }
+ if (lastHeartbeat == null) {
+ InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeatMsg);
+ if (clusterNode == null) {
+ insertClusterNode(clusterInfo, heartbeatMsg, clusterInfo.getCreator());
+ } else {
+ updateClusterNode(clusterNode);
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow merged pull request #6184: [INLONG-6181][Agent][DataProxy][Manager] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow merged PR #6184:
URL: https://github.com/apache/inlong/pull/6184
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995466764
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java:
##########
@@ -62,6 +62,7 @@ public class SenderManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+ private static final String PROTOCOL_TYPE_TCP = "tcp";
Review Comment:
Suggest move the `org.apache.inlong.manager.common.consts.ProtocolType` to inlong-common(`org.apache.inlong.common.constant`), and use it in Agent, DataProxy, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] lucaspeng12138 commented on pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
lucaspeng12138 commented on PR #6184:
URL: https://github.com/apache/inlong/pull/6184#issuecomment-1278639017
For configuration, I don't known if any user only use http but no tcp to report? If so, docker script should add some operations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] lucaspeng12138 commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
lucaspeng12138 commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997697217
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java:
##########
@@ -97,26 +99,35 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
// if the heartbeat was not in the cache, insert or update the node by the heartbeat info
HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat);
- boolean exist = true;
- int updateNum = UPDATE_ZERO_ROW;
- if (lastHeartbeat == null) {
- exist = false;
- InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat);
- if (clusterNode == null) {
- updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator());
- log.info("insert node result: {}", updateNum);
- } else {
- updateNum = updateClusterNode(clusterNode);
- log.info("update node result: {}", updateNum);
+ String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
+ String protocolType = heartbeat.getProtocolType();
+ String[] protocolTypes = StringUtils.isNoneBlank(protocolType) && ports.length > 1
+ ? heartbeat.getProtocolType().split(InlongConstants.COMMA) : null;
+ for (int i = 0; i < ports.length; i++) {
+ HeartbeatMsg heartbeatMsg = OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsBytes(heartbeat), HeartbeatMsg.class);
+ heartbeatMsg.setPort(ports[i].trim());
+ heartbeatMsg.setProtocolType(protocolType);
+ if (protocolTypes != null) {
+ heartbeatMsg.setProtocolType(protocolTypes[i]);
+ }
+ if (lastHeartbeat == null) {
+ InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeatMsg);
+ if (clusterNode == null) {
+ insertClusterNode(clusterInfo, heartbeatMsg, clusterInfo.getCreator());
+ } else {
+ updateClusterNode(clusterNode);
Review Comment:
May cause this bug happens, https://github.com/apache/inlong/issues/6159. Please add judgment for MySql IO return.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] lucaspeng12138 commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
lucaspeng12138 commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995466315
##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -29,6 +32,17 @@ sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_co
sed -i "s/proxy.cluster.name=.*$/proxy.cluster.name=${CLUSTER_NAME}/g" "${common_conf_file}"
sed -i "s/proxy.cluster.inCharges=.*$/proxy.cluster.inCharges=${CLUSTER_IN_CHARGES}/g" "${common_conf_file}"
+if [ "${PROTOCOL_HTTP}" = "http" ]; then
+ echo "proxy.report.http.protocol=${PROTOCOL_HTTP}" >> "${common_conf_file}"
+ if [ "${MQ_TYPE}" = "tubemq" ]; then
+ sed -i "s/agent1.sources=.*$/agent1.sources=tcp-source http-source/g" "${tube_conf_file}"
+ sed -n '29~49p' ${http_conf_file} >> ${tube_conf_file}
+ else
Review Comment:
Seems code format not aligned
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] dockerzhang commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
dockerzhang commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r996652178
##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,16 @@ manager.hosts=127.0.0.1:8083
manager.auth.secretId=
manager.auth.secretKey=
proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+#support HTTP or TCP, default TCP. for example: proxy.report.protocol.type=TCP, proxy.report.protocol.type=HTTP, proxy.report.protocol.type=TCP,HTTP
+proxy.report.protocol.type=TCP
Review Comment:
->
\# support HTTP/TCP/TCP,HTTP
proxy.report.protocol.type=TCP
proxy.report.tcp.port=46801
proxy.report.http.port=46802
##########
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh:
##########
@@ -21,6 +21,9 @@ local_ip=$(ifconfig | grep inet | grep -v inet6 | grep -v "127.0.0.1" | awk '{pr
# config
cd "${file_path}/"
common_conf_file=./conf/common.properties
+pulsar_conf_file=./conf/dataproxy-pulsar.conf
+tube_conf_file=./conf/dataproxy-tube.conf
Review Comment:
here we can add CONFIG_FILE="dataproxy-${MQ_TYPE}.conf" to define different configuration file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r995467960
##########
inlong-dataproxy/conf/common.properties:
##########
@@ -23,14 +23,15 @@ manager.hosts=127.0.0.1:8083
manager.auth.secretId=
manager.auth.secretKey=
proxy.report.ip=127.0.0.1
-proxy.report.port=46801
+proxy.report.tcp.port=46801
+proxy.report.http.port=46802
+proxy.report.tcp.protocol=tcp
# proxy cluster name
proxy.cluster.name=default_dataproxy
proxy.cluster.tag=default_cluster
proxy.cluster.inCharges=admin
# check interval of local config (millisecond)
configCheckInterval=10000
-
Review Comment:
Please keep this blank line, it is more readable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997001112
##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java:
##########
@@ -79,7 +84,7 @@ public void start() {
}
@Override
- public void reportHeartbeat(HeartbeatMsg heartbeat) {
+ public void reportHeartbeat(List<HeartbeatMsg> heartbeat) {
Review Comment:
Has been removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6184: [INLONG-6181][Agent][Dataproxy][SDK] Support node protocol reporting and query
Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #6184:
URL: https://github.com/apache/inlong/pull/6184#discussion_r997805100
##########
inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java:
##########
@@ -33,7 +33,7 @@ public class ComponentHeartbeat {
private String ip;
- private int port;
+ private String port;
Review Comment:
DP supports multiple ports.
![企业微信截图_33dc10fa-fc6b-4ea9-b4ed-040f7d099eac](https://user-images.githubusercontent.com/17596132/196360973-d0f4f58e-d696-4ad4-8f46-4eb768ebc204.png)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org