You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/01/23 02:04:29 UTC

[incubator-inlong] branch master updated: [INLONG-2218][DataProxy] Inlong-DataProxy support authentication access Pulsar (#2262)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a24256f  [INLONG-2218][DataProxy] Inlong-DataProxy support authentication access Pulsar (#2262)
a24256f is described below

commit a24256fdaabc100a3a66551dc148cabb12eb17e1
Author: baomingyu <ba...@163.com>
AuthorDate: Sun Jan 23 10:04:26 2022 +0800

    [INLONG-2218][DataProxy] Inlong-DataProxy support authentication access Pulsar (#2262)
    
    * add pulsar client auth config in dataproxy module
    
    * modify example for pulsar auth config
    
    * modify pulsar token default value
    
    * change code style
---
 .../conf/flume-mulit-pulsar-http-example.conf      | 20 ++++++++++++----
 .../conf/flume-mulit-pulsar-tcp-example.conf       | 15 +++++++++++-
 .../conf/flume-mulit-pulsar-udp-example.conf       | 28 +++++++++++++++-------
 .../dataproxy/sink/pulsar/PulsarClientService.java | 22 +++++++++++++----
 4 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
index 09ce407..b637090 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
@@ -78,6 +78,9 @@ agent1.channels.ch-msg6.fsyncInterval = 10
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg1.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg1.pulsar_token =
 agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,11 +88,14 @@ agent1.sinks.pulsar-sink-msg1.client-id-cache = true
 agent1.sinks.pulsar-sink-msg1.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg1.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg1.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg2.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg2.pulsar_token =
 agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -97,11 +103,14 @@ agent1.sinks.pulsar-sink-msg2.client-id-cache = true
 agent1.sinks.pulsar-sink-msg2.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg2.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg2.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg5.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg5.pulsar_token =
 agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -109,11 +118,14 @@ agent1.sinks.pulsar-sink-msg5.client-id-cache = true
 agent1.sinks.pulsar-sink-msg5.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg5.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg5.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
 agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg6.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg6.pulsar_token =
 agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg6.thread-num = 8
@@ -121,4 +133,4 @@ agent1.sinks.pulsar-sink-msg6.client-id-cache = true
 agent1.sinks.pulsar-sink-msg6.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg6.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg6.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec=20000000
\ No newline at end of file
+agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec = 20000000
\ No newline at end of file
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
index 3658f6c..78fb34a 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
@@ -79,6 +79,10 @@ agent1.channels.ch-msg6.fsyncInterval = 10
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg1.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg1.pulsar_token =
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -86,11 +90,14 @@ agent1.sinks.pulsar-sink-msg1.client-id-cache = true
 agent1.sinks.pulsar-sink-msg1.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg1.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg1.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg2.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg2.pulsar_token =
 agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -103,6 +110,9 @@ agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg5.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg5.pulsar_token =
 agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -115,6 +125,9 @@ agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
 agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg6.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg6.pulsar_token =
 agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg6.thread-num = 8
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
index c2c688d..6389395 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
@@ -42,10 +42,10 @@ agent1.sources.upd-source.sendBufferSize = 1048576
 agent1.sources.upd-source.custom-cp = true
 agent1.sources.upd-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
 agent1.sources.upd-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.upd-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.upd-source.metric-agent-port=8003
-agent1.sources.upd-source.metric-cache-size=1000000
-agent1.sources.upd-source.set=10
+agent1.sources.upd-source.metric-recovery-path = /data/DataProxy/file/recovery
+agent1.sources.upd-source.metric-agent-port = 8003
+agent1.sources.upd-source.metric-cache-size = 1000000
+agent1.sources.upd-source.set = 10
 
 agent1.channels.ch-msg1.type = memory
 agent1.channels.ch-msg1.capacity = 10000
@@ -79,6 +79,9 @@ agent1.channels.ch-msg6.fsyncInterval = 10
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg1.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg1.pulsar_token =
 agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -86,11 +89,14 @@ agent1.sinks.pulsar-sink-msg1.client-id-cache = true
 agent1.sinks.pulsar-sink-msg1.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg1.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg1.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg2.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg2.pulsar_token =
 agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -98,11 +104,14 @@ agent1.sinks.pulsar-sink-msg2.client-id-cache = true
 agent1.sinks.pulsar-sink-msg2.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg2.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg2.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg5.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg5.pulsar_token =
 agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -110,11 +119,14 @@ agent1.sinks.pulsar-sink-msg5.client-id-cache = true
 agent1.sinks.pulsar-sink-msg5.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg5.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg5.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
 agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg6.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg6.pulsar_token =
 agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg6.thread-num = 8
@@ -122,4 +134,4 @@ agent1.sinks.pulsar-sink-msg6.client-id-cache = true
 agent1.sinks.pulsar-sink-msg6.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg6.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg6.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec=20000000
\ No newline at end of file
+agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec = 20000000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 83b57a0..d1155cc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -33,6 +34,8 @@ import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.sink.EventStat;
 import org.apache.inlong.commons.monitor.LogCounter;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -50,6 +53,9 @@ public class PulsarClientService {
      * properties key for pulsar client
      */
     private static String PULSAR_SERVER_URL_LIST = "pulsar_server_url_list";
+    private static String PULSAR_TOKEN = "pulsar_token";
+    private static String PULSAR_AUTH_TYPE = "pulsar_auth_type";
+    private static String PULSAR_DEFAULT_AUTH_TYPE = "token";
 
     /*
      * properties key pulsar producer
@@ -74,7 +80,8 @@ public class PulsarClientService {
      * for pulsar client
      */
     private String[] pulsarServerUrls;
-
+    private String token;
+    private String authType;
     /*
      * for producer
      */
@@ -100,6 +107,8 @@ public class PulsarClientService {
         String pulsarServerUrlList = context.getString(PULSAR_SERVER_URL_LIST);
         Preconditions.checkState(pulsarServerUrlList != null, "No pulsar server url specified");
         pulsarServerUrls = pulsarServerUrlList.split("\\|");
+        token = context.getString(PULSAR_TOKEN);
+        authType = context.getString(PULSAR_AUTH_TYPE, PULSAR_DEFAULT_AUTH_TYPE);
         Preconditions.checkState(pulsarServerUrls != null && pulsarServerUrls.length > 0, "No "
                 + "pulsar server url config");
         sendTimeout = context.getInteger(SEND_TIMEOUT, DEFAULT_SEND_TIMEOUT_MILL);
@@ -228,10 +237,13 @@ public class PulsarClientService {
     }
 
     private PulsarClient initPulsarClient(String pulsarUrl) throws Exception {
-        return PulsarClient.builder()
-                .serviceUrl(pulsarUrl)
-                .connectionTimeout(clientTimeout, TimeUnit.SECONDS)
-                .build();
+        ClientBuilder builder = PulsarClient.builder();
+        if (PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
+            builder.authentication(AuthenticationFactory.token(token));
+        }
+        builder.serviceUrl(pulsarUrl)
+                .connectionTimeout(clientTimeout, TimeUnit.SECONDS);
+        return builder.build();
     }
 
     public List<TopicProducerInfo> initTopicProducer(String topic) {