You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/08 13:13:29 UTC
[incubator-inlong] 03/03: [INLONG-4598][Manager] Fix the Pulsar topic not match error (#4599)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 963e7ea7a201803a7b5141c987824de267047132
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Jun 8 21:08:03 2022 +0800
[INLONG-4598][Manager] Fix the Pulsar topic not match error (#4599)
---
.../inlong/manager/common/pojo/source/pulsar/PulsarSource.java | 2 +-
.../inlong/manager/service/sort/CreateSortConfigListenerV2.java | 3 +++
.../apache/inlong/manager/service/sort/util/ExtractNodeUtils.java | 5 +++--
3 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java
index 57fdc4a05..7a839b844 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java
@@ -43,7 +43,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
public class PulsarSource extends StreamSource {
@ApiModelProperty("Pulsar tenant")
- private String tenant = "default";
+ private String tenant = "public";
@ApiModelProperty("Pulsar namespace")
private String namespace;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index b107998bf..5276fd5b8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -144,9 +144,12 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
String adminUrl = pulsarCluster.getAdminUrl();
String serviceUrl = pulsarCluster.getUrl();
+ String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongGroupSettings.DEFAULT_PULSAR_TENANT
+ : pulsarCluster.getTenant();
streamInfoList.forEach(streamInfo -> {
PulsarSource pulsarSource = new PulsarSource();
String streamId = streamInfo.getInlongStreamId();
+ pulsarSource.setTenant(tenant);
pulsarSource.setSourceName(streamId);
pulsarSource.setNamespace(groupInfo.getMqResource());
pulsarSource.setTopic(streamInfo.getMqResource());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 3f3c44011..406376798 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -231,7 +231,8 @@ public class ExtractNodeUtils {
List<FieldInfo> fieldInfos = streamFields.stream()
.map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
- String topic = pulsarSource.getTopic();
+ String fullTopicName =
+ pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
Format format;
DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
@@ -269,7 +270,7 @@ public class ExtractNodeUtils {
fieldInfos,
null,
Maps.newHashMap(),
- topic,
+ fullTopicName,
adminUrl,
serviceUrl,
format,