You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/04 16:05:24 UTC

[incubator-inlong] branch master updated: [INLONG-2906][Manager] Fix conflicts defined between dataproxy and sort #2907

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 36dc2dc  [INLONG-2906][Manager] Fix conflicts defined between dataproxy and sort #2907
36dc2dc is described below

commit 36dc2dce5d1ca412f39cdbeeb978ea7cecd577e5
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 00:05:17 2022 +0800

    [INLONG-2906][Manager] Fix conflicts defined between dataproxy and sort #2907
---
 .../manager/client/api/util/InlongStreamTransfer.java       |  5 +++--
 .../service/core/impl/DataProxyClusterServiceImpl.java      | 13 +++++++------
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 5d21b3a..d97379f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -30,10 +30,11 @@ public class InlongStreamTransfer {
     public static InlongStreamInfo createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
         InlongStreamInfo dataStreamInfo = new InlongStreamInfo();
         dataStreamInfo.setInlongGroupId(groupInfo.getInlongGroupId());
-        dataStreamInfo.setInlongStreamId("b_" + streamConf.getName());
+        final String streamId = "b_" + streamConf.getName();
+        dataStreamInfo.setInlongStreamId(streamId);
         dataStreamInfo.setName(streamConf.getName());
         dataStreamInfo.setDataEncoding(streamConf.getCharset().name());
-        dataStreamInfo.setMqResourceObj(groupInfo.getMqResourceObj() + "_" + streamConf.getName());
+        dataStreamInfo.setMqResourceObj(streamId);
         dataStreamInfo.setDataSeparator(String.valueOf(streamConf.getDataSeparator().getAsciiCode()));
         dataStreamInfo.setDescription(streamConf.getDescription());
         dataStreamInfo.setCreator(groupInfo.getCreator());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
index 3a7820c..d640caa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
@@ -218,10 +218,10 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
             } else if (Constant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
                 List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
                 for (InlongStreamEntity stream : streamList) {
+                    String topic = stream.getMqResourceObj();
                     String streamId = stream.getInlongStreamId();
                     config.setInlongGroupId(groupId + "/" + streamId);
-                    config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + groupId + "/" + streamId);
-
+                    config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + bizResource + "/" + topic);
                 }
             }
             configList.add(config);
@@ -261,22 +261,23 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
         // based on group id, get topic list
         for (InlongGroupEntity inlongGroupEntity : groupEntities) {
 //        for (String groupId : groupIdList) {
-            String groupId = inlongGroupEntity.getInlongGroupId();
+            final String groupId = inlongGroupEntity.getInlongGroupId();
+            final String mqResource = inlongGroupEntity.getMqResourceObj();
             if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
                 List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
-
                 for (InlongStreamEntity stream : streamList) {
                     DataProxyConfig topicConfig = new DataProxyConfig();
                     String streamId = stream.getInlongStreamId();
+                    String topic = stream.getMqResourceObj();
                     topicConfig.setInlongGroupId(groupId + "/" + streamId);
-                    topicConfig.setTopic("persistent://" + tenant + "/" + groupId + "/" + streamId);
+                    topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
                     topicList.add(topicConfig);
 
                 }
             } else if (Constant.MIDDLEWARE_TUBE.equals(middlewareType)) {
                 DataProxyConfig topicConfig = new DataProxyConfig();
                 topicConfig.setInlongGroupId(groupId);
-                topicConfig.setTopic(groupId);
+                topicConfig.setTopic(mqResource);
                 topicList.add(topicConfig);
 
             }