You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/08 13:13:29 UTC

[incubator-inlong] 03/03: [INLONG-4598][Manager] Fix the Pulsar topic not match error (#4599)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 963e7ea7a201803a7b5141c987824de267047132
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Jun 8 21:08:03 2022 +0800

    [INLONG-4598][Manager] Fix the Pulsar topic not match error (#4599)
---
 .../inlong/manager/common/pojo/source/pulsar/PulsarSource.java       | 2 +-
 .../inlong/manager/service/sort/CreateSortConfigListenerV2.java      | 3 +++
 .../apache/inlong/manager/service/sort/util/ExtractNodeUtils.java    | 5 +++--
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java
index 57fdc4a05..7a839b844 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java
@@ -43,7 +43,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 public class PulsarSource extends StreamSource {
 
     @ApiModelProperty("Pulsar tenant")
-    private String tenant = "default";
+    private String tenant = "public";
 
     @ApiModelProperty("Pulsar namespace")
     private String namespace;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index b107998bf..5276fd5b8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -144,9 +144,12 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
         String adminUrl = pulsarCluster.getAdminUrl();
         String serviceUrl = pulsarCluster.getUrl();
+        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongGroupSettings.DEFAULT_PULSAR_TENANT
+                : pulsarCluster.getTenant();
         streamInfoList.forEach(streamInfo -> {
             PulsarSource pulsarSource = new PulsarSource();
             String streamId = streamInfo.getInlongStreamId();
+            pulsarSource.setTenant(tenant);
             pulsarSource.setSourceName(streamId);
             pulsarSource.setNamespace(groupInfo.getMqResource());
             pulsarSource.setTopic(streamInfo.getMqResource());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 3f3c44011..406376798 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -231,7 +231,8 @@ public class ExtractNodeUtils {
         List<FieldInfo> fieldInfos = streamFields.stream()
                 .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
-        String topic = pulsarSource.getTopic();
+        String fullTopicName =
+                pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
 
         Format format;
         DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
@@ -269,7 +270,7 @@ public class ExtractNodeUtils {
                 fieldInfos,
                 null,
                 Maps.newHashMap(),
-                topic,
+                fullTopicName,
                 adminUrl,
                 serviceUrl,
                 format,