You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/19 09:36:53 UTC

[inlong] branch master updated: [INLONG-5604][Manager] Add token info for Pulsar source (#5609)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b1ebf9357 [INLONG-5604][Manager] Add token info for Pulsar source (#5609)
b1ebf9357 is described below

commit b1ebf9357cb8810ded71e1bce3e9ffa9134ed5e1
Author: healchow <he...@gmail.com>
AuthorDate: Fri Aug 19 17:36:49 2022 +0800

    [INLONG-5604][Manager] Add token info for Pulsar source (#5609)
---
 .../manager/service/source/pulsar/PulsarSourceOperator.java | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index b6b89b0b9..03fb7f96b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -56,6 +56,12 @@ import java.util.Map;
 @Service
 public class PulsarSourceOperator extends AbstractSourceOperator {
 
+    private static final String AUTH_CLASSNAME_KEY = "properties.auth-plugin-classname";
+    private static final String AUTH_CLASSNAME_VALUE = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
+    private static final String AUTH_PARAMS_KEY = "properties.auth-params";
+    // the %s must be replaced by the actual value
+    private static final String AUTH_PARAMS_VALUE = "token:%s";
+
     @Autowired
     private ObjectMapper objectMapper;
     @Autowired
@@ -121,6 +127,13 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
             pulsarSource.setServiceUrl(serviceUrl);
             pulsarSource.setInlongComponent(true);
 
+            // set the token info
+            if (StringUtils.isNotBlank(pulsarCluster.getToken())) {
+                Map<String, Object> properties = pulsarSource.getProperties();
+                properties.putIfAbsent(AUTH_CLASSNAME_KEY, AUTH_CLASSNAME_VALUE);
+                properties.putIfAbsent(AUTH_PARAMS_KEY, String.format(AUTH_PARAMS_VALUE, pulsarCluster.getToken()));
+            }
+
             for (StreamSource sourceInfo : streamSources) {
                 if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
                     continue;