You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/07 12:08:46 UTC
[incubator-inlong] branch master updated: [INLONG-2973][Manager] Get pulsar info from the cluster table (#2978)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d208304 [INLONG-2973][Manager] Get pulsar info from the cluster table (#2978)
d208304 is described below
commit d208304c9334d1d2ad5205cd5cb815f40cbe6ad2
Author: pacino <ge...@gmail.com>
AuthorDate: Mon Mar 7 20:08:20 2022 +0800
[INLONG-2973][Manager] Get pulsar info from the cluster table (#2978)
---
.../common/pojo/dataproxy/PulsarClusterInfo.java | 40 ++++++-------------
.../inlong/manager/client/api/InlongGroupConf.java | 3 ++
.../client/api/util/InlongGroupTransfer.java | 9 +++--
.../manager/common/pojo/group/InlongGroupInfo.java | 10 +++--
.../common/pojo/group/InlongGroupRequest.java | 3 ++
.../common/pojo/group/InlongGroupResponse.java | 10 +++--
.../resources/mappers/InlongGroupEntityMapper.xml | 15 ++++++-
.../manager/service/CommonOperateService.java | 20 ++++++++++
.../service/core/impl/InlongGroupServiceImpl.java | 6 ++-
.../mq/CreatePulsarGroupForStreamTaskListener.java | 14 ++++---
.../mq/CreatePulsarGroupTaskListener.java | 17 ++++----
.../mq/CreatePulsarResourceTaskListener.java | 23 ++++++-----
.../mq/CreatePulsarTopicForStreamTaskListener.java | 18 +++++----
.../service/thirdparty/mq/util/PulsarUtils.java | 46 +++++++---------------
.../thirdparty/sort/CreateSortConfigListener.java | 15 +++----
.../thirdparty/sort/PushSortConfigListener.java | 13 +++---
.../thirdparty/sort/util/SourceInfoUtils.java | 44 +++++----------------
.../ConsumptionCompleteProcessListener.java | 23 ++++++-----
.../thirdparty/mq/util/PulsarUtilsTest.java | 11 +++---
19 files changed, 175 insertions(+), 165 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
index a942575..f72d6c7 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
@@ -17,35 +17,21 @@
package org.apache.inlong.common.pojo.dataproxy;
-import java.util.HashMap;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
import java.util.Map;
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
public class PulsarClusterInfo {
- private String url;
- private String token;
- private Map<String, String> params = new HashMap<>();
-
- public String getUrl() {
- return url;
- }
-
- public void setUrl(String url) {
- this.url = url;
- }
- public String getToken() {
- return token;
- }
-
- public void setToken(String token) {
- this.token = token;
- }
-
- public Map<String, String> getParams() {
- return params;
- }
-
- public void setParams(Map<String, String> params) {
- this.params = params;
- }
+ private String adminUrl;
+ private String token;
+ private String brokerServiceUrl;
+ private Map<String, String> ext;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
index 1fbecb9..4ab7ff8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
@@ -58,4 +58,7 @@ public class InlongGroupConf {
@ApiModelProperty("Need zookeeper support")
private boolean zookeeperEnabled = true;
+
+ @ApiModelProperty("data proxy cluster id")
+ private Integer proxyClusterId;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 6bdfa6b..8e948f5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -19,9 +19,6 @@ package org.apache.inlong.manager.client.api.util;
import com.google.common.collect.Lists;
import com.google.gson.reflect.TypeToken;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -46,6 +43,10 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.JsonUtils;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
public class InlongGroupTransfer {
public static InlongGroupConf parseGroupResponse(InlongGroupResponse groupResponse) {
@@ -58,6 +59,7 @@ public class InlongGroupTransfer {
inlongGroupConf.setPeakRecords(Long.valueOf(groupResponse.getPeakRecords()));
inlongGroupConf.setMqBaseConf(parseMqBaseConf(groupResponse));
inlongGroupConf.setSortBaseConf(parseSortBaseConf(groupResponse));
+ inlongGroupConf.setProxyClusterId(groupResponse.getProxyClusterId());
return inlongGroupConf;
}
@@ -187,6 +189,7 @@ public class InlongGroupTransfer {
groupInfo.setDailyRecords(groupConf.getDailyRecords().intValue());
groupInfo.setPeakRecords(groupConf.getPeakRecords().intValue());
groupInfo.setMaxLength(groupConf.getMaxLength());
+ groupInfo.setProxyClusterId(groupConf.getProxyClusterId());
MqBaseConf mqConf = groupConf.getMqBaseConf();
MqType mqType = mqConf.getType();
groupInfo.setMiddlewareType(mqType.name());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java
index a9da378..70e58ad 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java
@@ -20,15 +20,16 @@ package org.apache.inlong.manager.common.pojo.group;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import java.util.List;
-import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+import java.util.List;
+
/**
* Inlong group info
*/
@@ -117,6 +118,9 @@ public class InlongGroupInfo {
@ApiModelProperty(value = "Temporary view, string in JSON format")
private String tempView;
+ @ApiModelProperty(value = "data proxy cluster id")
+ private Integer proxyClusterId;
+
@ApiModelProperty(value = "Inlong group Extension properties")
private List<InlongGroupExtInfo> extList;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
index 3ca513a..8909e36 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
@@ -117,6 +117,9 @@ public class InlongGroupRequest {
@ApiModelProperty(value = "Temporary view, string in JSON format")
private String tempView;
+ @ApiModelProperty(value = "data proxy cluster id")
+ private Integer proxyClusterId;
+
@ApiModelProperty(value = "Inlong group Extension properties")
private List<InlongGroupExtInfo> extList;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java
index d771e56..e4f4313 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java
@@ -20,14 +20,15 @@ package org.apache.inlong.manager.common.pojo.group;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import java.util.List;
-import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+import java.util.List;
+
@Data
@Builder
@NoArgsConstructor
@@ -113,6 +114,9 @@ public class InlongGroupResponse {
@ApiModelProperty(value = "Temporary view, string in JSON format")
private String tempView;
+ @ApiModelProperty(value = "data proxy cluster Id")
+ private Integer proxyClusterId;
+
@ApiModelProperty(value = "Inlong group Extension properties")
private List<InlongGroupExtInfo> extList;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index 88979c0..4999d54 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -54,7 +54,8 @@
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, name, cn_name, description, middleware_type, queue_module, topic_partition_num,
+ id
+ , inlong_group_id, name, cn_name, description, middleware_type, queue_module, topic_partition_num,
mq_resource_obj, daily_records, daily_storage, peak_records, max_length, schema_name, in_charges, followers,
status, is_deleted, creator, modifier, create_time, modify_time, temp_view, zookeeper_enabled, proxy_cluster_id
</sql>
@@ -231,6 +232,9 @@
<if test="zookeeperEnabled != null">
zookeeper_enabled,
</if>
+ <if test="proxyClusterId != null">
+ proxy_cluster_id,
+ </if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -305,6 +309,9 @@
<if test="zookeeperEnabled != null">
#{zookeeperEnabled,jdbcType=INTEGER},
</if>
+ <if test="proxyClusterId != null">
+ #{proxyClusterId,jdbcType=INTEGER},
+ </if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
@@ -379,6 +386,9 @@
<if test="zookeeperEnabled != null">
zookeeper_enabled = #{zookeeperEnabled,jdbcType=INTEGER},
</if>
+ <if test="proxyClusterId != null">
+ proxy_cluster_id = #{proxyClusterId,jdbcType=INTEGER},
+ </if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
@@ -445,6 +455,9 @@
<if test="zookeeperEnabled != null">
zookeeper_enabled = #{zookeeperEnabled,jdbcType=INTEGER},
</if>
+ <if test="proxyClusterId != null">
+ proxy_cluster_id = #{proxyClusterId,jdbcType=INTEGER},
+ </if>
</set>
where inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
and is_deleted = 0
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index d5326e5..345f9b8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -18,12 +18,14 @@
package org.apache.inlong.manager.service;
import com.google.gson.Gson;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -139,6 +141,24 @@ public class CommonOperateService {
}
/**
+ * get pulsar cluster info
+ *
+ * @return
+ */
+ public PulsarClusterInfo getPulsarClusterInfo() {
+ ThirdPartyClusterEntity thirdPartyClusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
+ Preconditions.checkNotNull(thirdPartyClusterEntity.getExtParams(), "pulsar extParam is empty, check"
+ + "third party cluster table");
+ Map<String, String> configParams = JsonUtils.parse(thirdPartyClusterEntity.getExtParams(), Map.class);
+ PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl(
+ thirdPartyClusterEntity.getUrl()).token(thirdPartyClusterEntity.getToken()).build();
+ String adminUrl = configParams.get(Constant.PULSAR_ADMINURL);
+ Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table");
+ pulsarClusterInfo.setAdminUrl(adminUrl);
+ return pulsarClusterInfo;
+ }
+
+ /**
* Check whether the inlong group status is temporary
*
* @param groupId Inlong group id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index 3bdd112..5289333 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -22,6 +22,7 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -184,8 +185,9 @@ public class InlongGroupServiceImpl implements InlongGroupService {
if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
} else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
- groupInfo.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
- groupInfo.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL));
+ PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
+ groupInfo.setPulsarAdminUrl(pulsarClusterInfo.getAdminUrl());
+ groupInfo.setPulsarServiceUrl(pulsarClusterInfo.getBrokerServiceUrl());
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
index f448f9e..a814379 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -17,10 +17,9 @@
package org.apache.inlong.manager.service.thirdparty.mq;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -41,6 +40,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.List;
+
/**
* Create a subscription group for a single inlong stream
*/
@@ -85,9 +86,8 @@ public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListe
log.warn("inlong stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId);
return ListenerResult.success();
}
-
- try (PulsarAdmin globalPulsarAdmin = PulsarUtils
- .getPulsarAdmin(groupInfo, commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
// Query data sink info based on groupId and streamId
List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
if (sinkTypeList == null || sinkTypeList.size() == 0) {
@@ -108,7 +108,9 @@ public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListe
String namespace = groupInfo.getMqResourceObj();
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo, serviceUrl)) {
+ PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+ .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String fullTopic = tenant + "/" + namespace + "/" + topic;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
index 51b7355..304dc78 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
@@ -18,8 +18,8 @@
package org.apache.inlong.manager.service.thirdparty.mq;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -82,9 +82,8 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
return ListenerResult.success();
}
-
- try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(bizInfo,
- commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
String tenant = clusterBean.getDefaultTenant();
String namespace = bizInfo.getMqResourceObj();
@@ -98,14 +97,16 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
// Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
for (String cluster : pulsarClusters) {
- String url = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(bizInfo, url)) {
+ String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+ PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+ .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String topicFull = tenant + "/" + namespace + "/" + topic;
- log.error("topic={} not exists in {}", topicFull, url);
- throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + url);
+ log.error("topic={} not exists in {}", topicFull, serviceUrl);
+ throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
}
// Consumer naming rules: sortAppName_topicName_consumer_group
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
index 1ad4b56..9cb3d88 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
@@ -17,10 +17,9 @@
package org.apache.inlong.manager.service.thirdparty.mq;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -41,6 +40,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.List;
+
/**
* Create Pulsar tenant, namespace and topic
*/
@@ -76,13 +77,14 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
if (groupInfo == null) {
throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
}
-
- try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo,
- commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
- this.createPulsarProcess(groupInfo, serviceUrl);
+ PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+ .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+ this.createPulsarProcess(groupInfo, pulsarClusterInfo);
}
} catch (Exception e) {
log.error("create pulsar resource error for groupId={}", groupId, e);
@@ -96,9 +98,9 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
/**
* Create Pulsar tenant, namespace and topic
*/
- private void createPulsarProcess(InlongGroupInfo groupInfo, String serviceHttpUrl) throws Exception {
+ private void createPulsarProcess(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarClusterInfo) throws Exception {
String groupId = groupInfo.getInlongGroupId();
- log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, serviceHttpUrl);
+ log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, pulsarClusterInfo);
String namespace = groupInfo.getMqResourceObj();
Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty for groupId=" + groupId);
@@ -106,7 +108,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
Preconditions.checkNotNull(queueModule, "queue module cannot be empty for groupId=" + groupId);
String tenant = clusterBean.getDefaultTenant();
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo, serviceHttpUrl)) {
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
// create pulsar tenant
pulsarOptService.createTenant(pulsarAdmin, tenant);
@@ -125,7 +127,8 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
pulsarOptService.createTopic(pulsarAdmin, topicBean);
}
}
- log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId, serviceHttpUrl);
+ log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId,
+ pulsarClusterInfo.getAdminUrl());
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
index 5cc4783..3864ac2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -17,10 +17,9 @@
package org.apache.inlong.manager.service.thirdparty.mq;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -38,6 +37,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.List;
+
/**
* Create task listener for Pulsar Topic
*/
@@ -75,13 +76,15 @@ public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListe
}
log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
- try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo,
- commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+ PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+ .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
String pulsarTopic = streamEntity.getMqResourceObj();
- this.createTopic(groupInfo, pulsarTopic, serviceUrl);
+ this.createTopic(groupInfo, pulsarTopic, pulsarClusterInfo);
}
} catch (Exception e) {
log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
@@ -93,14 +96,15 @@ public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListe
return ListenerResult.success();
}
- private void createTopic(InlongGroupInfo bizInfo, String pulsarTopic, String serviceHttpUrl) throws Exception {
+ private void createTopic(InlongGroupInfo bizInfo, String pulsarTopic, PulsarClusterInfo pulsarClusterInfo)
+ throws Exception {
Integer partitionNum = bizInfo.getTopicPartitionNum();
int partition = 0;
if (partitionNum != null && partitionNum > 0) {
partition = partitionNum;
}
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(bizInfo, serviceHttpUrl)) {
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
PulsarTopicBean topicBean = PulsarTopicBean.builder()
.tenant(clusterBean.getDefaultTenant())
.namespace(bizInfo.getMqResourceObj())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java
index 02ee046..12ab78a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java
@@ -17,18 +17,18 @@
package org.apache.inlong.manager.service.thirdparty.mq.util;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
+import java.util.List;
+
/**
* Pulsar connection utils
*/
@@ -41,36 +41,18 @@ public class PulsarUtils {
/**
* Get pulsar admin info
*/
- public static PulsarAdmin getPulsarAdmin(InlongGroupInfo groupInfo, String defaultServiceUrl)
+ public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo)
throws PulsarClientException {
- if (CollectionUtils.isEmpty(groupInfo.getExtList())) {
- return getPulsarAdmin(defaultServiceUrl);
- }
- List<InlongGroupExtInfo> groupExtInfoList = groupInfo.getExtList();
- String pulsarServiceUrl = null;
- String pulsarAuthentication = null;
- String pulsarAuthenticationType = InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE;
- for (InlongGroupExtInfo extInfo : groupExtInfoList) {
- if (InlongGroupSettings.PULSAR_ADMIN_URL.equals(extInfo.getKeyName())
- && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
- pulsarServiceUrl = extInfo.getKeyValue();
- }
- if (InlongGroupSettings.PULSAR_AUTHENTICATION_TYPE.equals(extInfo.getKeyName())
- && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
- pulsarAuthenticationType = extInfo.getKeyValue();
- }
- if (InlongGroupSettings.PULSAR_AUTHENTICATION.equals(extInfo.getKeyName())
- && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
- pulsarAuthentication = extInfo.getKeyValue();
- }
- }
- if (StringUtils.isEmpty(pulsarServiceUrl) && StringUtils.isEmpty(pulsarAuthentication)) {
- return getPulsarAdmin(defaultServiceUrl);
- } else if (StringUtils.isEmpty(pulsarAuthentication)) {
- return getPulsarAdmin(pulsarServiceUrl);
+ Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "pulsar adminUrl is empty, "
+ + "check third party cluster table");
+ PulsarAdmin pulsarAdmin;
+ if (StringUtils.isEmpty(pulsarClusterInfo.getToken())) {
+ pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl());
} else {
- return getPulsarAdmin(pulsarServiceUrl, pulsarAuthentication, pulsarAuthenticationType);
+ pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getToken(),
+ InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE);
}
+ return pulsarAdmin;
}
/**
@@ -80,7 +62,7 @@ public class PulsarUtils {
return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
}
- public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String authentication, String authenticationType)
+ private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String authentication, String authenticationType)
throws PulsarClientException {
if (InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE.equals(authenticationType)) {
return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 2e4e227..075be69 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -18,15 +18,12 @@
package org.apache.inlong.manager.service.thirdparty.sort;
import com.google.common.collect.Lists;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -67,6 +64,11 @@ import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
@Slf4j
@Component
public class CreateSortConfigListener implements SortOperateListener {
@@ -199,10 +201,9 @@ public class CreateSortConfigListener implements SortOperateListener {
DeserializationInfo deserializationInfo,
List<FieldInfo> fieldInfos) {
String topicName = streamInfo.getMqResourceObj();
- String pulsarAdminUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL);
- String pulsarServiceUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL);
+ PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
return SourceInfoUtils.createPulsarSourceInfo(groupInfo, topicName, deserializationInfo,
- fieldInfos, clusterBean.getAppName(), clusterBean.getDefaultTenant(), pulsarAdminUrl, pulsarServiceUrl);
+ fieldInfos, clusterBean.getAppName(), clusterBean.getDefaultTenant(), pulsarClusterInfo);
}
private TubeSourceInfo createTubeSourceInfo(InlongGroupInfo groupInfo,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index a7606aa..5c7abe0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -17,9 +17,8 @@
package org.apache.inlong.manager.service.thirdparty.sort;
-import java.util.ArrayList;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
@@ -56,6 +55,9 @@ import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.ArrayList;
+import java.util.List;
+
@Slf4j
@Component
public class PushSortConfigListener implements SortOperateListener {
@@ -149,7 +151,7 @@ public class PushSortConfigListener implements SortOperateListener {
* Get source info
*/
private SourceInfo getSourceInfo(InlongGroupInfo groupInfo,
- SinkResponse hiveResponse, List<StreamSinkFieldEntity> fieldList) {
+ SinkResponse hiveResponse, List<StreamSinkFieldEntity> fieldList) {
DeserializationInfo deserializationInfo = null;
String groupId = groupInfo.getInlongGroupId();
String streamId = hiveResponse.getInlongStreamId();
@@ -187,11 +189,10 @@ public class PushSortConfigListener implements SortOperateListener {
sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
} else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middleWare)) {
- String pulsarAdminUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL);
- String pulsarServiceUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL);
+ PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
sourceInfo = SourceInfoUtils.createPulsarSourceInfo(groupInfo, streamInfo.getMqResourceObj(),
deserializationInfo, sourceFields, clusterBean.getAppName(), clusterBean.getDefaultTenant(),
- pulsarAdminUrl, pulsarServiceUrl);
+ pulsarClusterInfo);
}
return sourceInfo;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 2027f20..9077073 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -17,19 +17,17 @@
package org.apache.inlong.manager.service.thirdparty.sort.util;
-import java.util.List;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.enums.SourceType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import java.util.List;
+
public class SourceInfoUtils {
public static boolean isBinlogMigrationSource(SourceResponse sourceResponse) {
@@ -41,39 +39,15 @@ public class SourceInfoUtils {
}
public static PulsarSourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo, String pulsarTopic,
- DeserializationInfo deserializationInfo,
- List<FieldInfo> fieldInfos, String appName,String tenant,
- String pulsarAdminUrl, String pulsarServiceUrl) {
+ DeserializationInfo deserializationInfo,
+ List<FieldInfo> fieldInfos, String appName, String tenant,
+ PulsarClusterInfo pulsarClusterInfo) {
final String namespace = groupInfo.getMqResourceObj();
// Full name of Topic in Pulsar
final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
final String consumerGroup = appName + "_" + pulsarTopic + "_consumer_group";
- String adminUrl = null;
- String serviceUrl = null;
- String authentication = null;
- if (CollectionUtils.isNotEmpty(groupInfo.getExtList())) {
- for (InlongGroupExtInfo extInfo : groupInfo.getExtList()) {
- if (InlongGroupSettings.PULSAR_SERVICE_URL.equals(extInfo.getKeyName())
- && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
- serviceUrl = extInfo.getKeyValue();
- }
- if (InlongGroupSettings.PULSAR_AUTHENTICATION.equals(extInfo.getKeyName())
- && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
- authentication = extInfo.getKeyValue();
- }
- if (InlongGroupSettings.PULSAR_ADMIN_URL.equals(extInfo.getKeyName())
- && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
- adminUrl = extInfo.getKeyValue();
- }
- }
- }
- if (StringUtils.isEmpty(adminUrl)) {
- adminUrl = pulsarAdminUrl;
- }
- if (StringUtils.isEmpty(serviceUrl)) {
- serviceUrl = pulsarServiceUrl;
- }
- return new PulsarSourceInfo(adminUrl, serviceUrl, fullTopicName, consumerGroup,
- deserializationInfo, fieldInfos.toArray(new FieldInfo[0]), authentication);
+ return new PulsarSourceInfo(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getBrokerServiceUrl(),
+ fullTopicName, consumerGroup, deserializationInfo, fieldInfos.toArray(new FieldInfo[0]),
+ pulsarClusterInfo.getToken());
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 19c6f26..1e0d757 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -17,11 +17,8 @@
package org.apache.inlong.manager.service.workflow.consumption.listener;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
@@ -46,6 +43,11 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
/**
* Added data consumption process complete archive event listener
*/
@@ -117,9 +119,8 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
String mqResourceObj = groupInfo.getMqResourceObj();
Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + groupId);
-
- try (PulsarAdmin pulsarAdmin = PulsarUtils
- .getPulsarAdmin(groupInfo, commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
PulsarTopicBean topicMessage = new PulsarTopicBean();
String tenant = clusterBean.getDefaultTenant();
topicMessage.setTenant(tenant);
@@ -129,7 +130,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
String consumerGroup = entity.getConsumerGroupId();
List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
List<String> topics = Arrays.asList(entity.getTopic().split(","));
- this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, groupInfo);
+ this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, globalCluster);
} catch (Exception e) {
log.error("create pulsar topic failed", e);
throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
@@ -138,11 +139,13 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
}
private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean,
- List<String> clusters, List<String> topics, InlongGroupInfo groupInfo) {
+ List<String> clusters, List<String> topics, PulsarClusterInfo globalCluster) {
try {
for (String cluster : clusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo, serviceUrl)) {
+ PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+ .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java
index a9c6ae0..e516e4c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java
@@ -18,8 +18,6 @@
package org.apache.inlong.manager.service.thirdparty.mq.util;
import com.google.common.collect.Lists;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
@@ -32,6 +30,9 @@ import org.junit.Assert;
import org.junit.Test;
import org.springframework.util.ReflectionUtils;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+
public class PulsarUtilsTest {
@Test
@@ -51,7 +52,7 @@ public class PulsarUtilsTest {
groupInfo.setExtList(groupExtInfoList);
final String defaultServiceUrl = "http://127.0.0.1:10080";
try {
- PulsarAdmin admin = PulsarUtils.getPulsarAdmin(groupInfo, defaultServiceUrl);
+ PulsarAdmin admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
Assert.assertEquals("http://127.0.0.1:8080", admin.getServiceUrl());
Field auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
assert auth != null;
@@ -66,7 +67,7 @@ public class PulsarUtilsTest {
groupExtInfo3.setKeyValue("token1");
groupExtInfoList.add(groupExtInfo3);
try {
- admin = PulsarUtils.getPulsarAdmin(groupInfo, defaultServiceUrl);
+ admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
} catch (Exception e) {
if (e instanceof IllegalArgumentException) {
Assert.assertTrue(e.getMessage().contains("illegal authentication type"));
@@ -75,7 +76,7 @@ public class PulsarUtilsTest {
groupExtInfoList = new ArrayList<>();
groupInfo.setExtList(groupExtInfoList);
- admin = PulsarUtils.getPulsarAdmin(groupInfo, defaultServiceUrl);
+ admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
Assert.assertEquals("http://127.0.0.1:10080", admin.getServiceUrl());
auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
assert auth != null;