You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/19 09:36:53 UTC
[inlong] branch master updated: [INLONG-5604][Manager] Add token info for Pulsar source (#5609)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b1ebf9357 [INLONG-5604][Manager] Add token info for Pulsar source (#5609)
b1ebf9357 is described below
commit b1ebf9357cb8810ded71e1bce3e9ffa9134ed5e1
Author: healchow <he...@gmail.com>
AuthorDate: Fri Aug 19 17:36:49 2022 +0800
[INLONG-5604][Manager] Add token info for Pulsar source (#5609)
---
.../manager/service/source/pulsar/PulsarSourceOperator.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index b6b89b0b9..03fb7f96b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -56,6 +56,12 @@ import java.util.Map;
@Service
public class PulsarSourceOperator extends AbstractSourceOperator {
+ private static final String AUTH_CLASSNAME_KEY = "properties.auth-plugin-classname";
+ private static final String AUTH_CLASSNAME_VALUE = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
+ private static final String AUTH_PARAMS_KEY = "properties.auth-params";
+ // the %s must be replaced by the actual value
+ private static final String AUTH_PARAMS_VALUE = "token:%s";
+
@Autowired
private ObjectMapper objectMapper;
@Autowired
@@ -121,6 +127,13 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
+ // set the token info
+ if (StringUtils.isNotBlank(pulsarCluster.getToken())) {
+ Map<String, Object> properties = pulsarSource.getProperties();
+ properties.putIfAbsent(AUTH_CLASSNAME_KEY, AUTH_CLASSNAME_VALUE);
+ properties.putIfAbsent(AUTH_PARAMS_KEY, String.format(AUTH_PARAMS_VALUE, pulsarCluster.getToken()));
+ }
+
for (StreamSource sourceInfo : streamSources) {
if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
continue;