You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/04 16:05:24 UTC
[incubator-inlong] branch master updated: [INLONG-2906][Manager] Fix conflicts defined between dataproxy and sort #2907
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 36dc2dc [INLONG-2906][Manager] Fix conflicts defined between dataproxy and sort #2907
36dc2dc is described below
commit 36dc2dce5d1ca412f39cdbeeb978ea7cecd577e5
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 00:05:17 2022 +0800
[INLONG-2906][Manager] Fix conflicts defined between dataproxy and sort #2907
---
.../manager/client/api/util/InlongStreamTransfer.java | 5 +++--
.../service/core/impl/DataProxyClusterServiceImpl.java | 13 +++++++------
2 files changed, 10 insertions(+), 8 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 5d21b3a..d97379f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -30,10 +30,11 @@ public class InlongStreamTransfer {
public static InlongStreamInfo createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
InlongStreamInfo dataStreamInfo = new InlongStreamInfo();
dataStreamInfo.setInlongGroupId(groupInfo.getInlongGroupId());
- dataStreamInfo.setInlongStreamId("b_" + streamConf.getName());
+ final String streamId = "b_" + streamConf.getName();
+ dataStreamInfo.setInlongStreamId(streamId);
dataStreamInfo.setName(streamConf.getName());
dataStreamInfo.setDataEncoding(streamConf.getCharset().name());
- dataStreamInfo.setMqResourceObj(groupInfo.getMqResourceObj() + "_" + streamConf.getName());
+ dataStreamInfo.setMqResourceObj(streamId);
dataStreamInfo.setDataSeparator(String.valueOf(streamConf.getDataSeparator().getAsciiCode()));
dataStreamInfo.setDescription(streamConf.getDescription());
dataStreamInfo.setCreator(groupInfo.getCreator());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
index 3a7820c..d640caa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
@@ -218,10 +218,10 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
} else if (Constant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
for (InlongStreamEntity stream : streamList) {
+ String topic = stream.getMqResourceObj();
String streamId = stream.getInlongStreamId();
config.setInlongGroupId(groupId + "/" + streamId);
- config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + groupId + "/" + streamId);
-
+ config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + bizResource + "/" + topic);
}
}
configList.add(config);
@@ -261,22 +261,23 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
// based on group id, get topic list
for (InlongGroupEntity inlongGroupEntity : groupEntities) {
// for (String groupId : groupIdList) {
- String groupId = inlongGroupEntity.getInlongGroupId();
+ final String groupId = inlongGroupEntity.getInlongGroupId();
+ final String mqResource = inlongGroupEntity.getMqResourceObj();
if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
-
for (InlongStreamEntity stream : streamList) {
DataProxyConfig topicConfig = new DataProxyConfig();
String streamId = stream.getInlongStreamId();
+ String topic = stream.getMqResourceObj();
topicConfig.setInlongGroupId(groupId + "/" + streamId);
- topicConfig.setTopic("persistent://" + tenant + "/" + groupId + "/" + streamId);
+ topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
topicList.add(topicConfig);
}
} else if (Constant.MIDDLEWARE_TUBE.equals(middlewareType)) {
DataProxyConfig topicConfig = new DataProxyConfig();
topicConfig.setInlongGroupId(groupId);
- topicConfig.setTopic(groupId);
+ topicConfig.setTopic(mqResource);
topicList.add(topicConfig);
}