You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/01/23 02:04:29 UTC
[incubator-inlong] branch master updated: [INLONG-2218][DataProxy] Inlong-DataProxy support authentication access Pulsar (#2262)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a24256f [INLONG-2218][DataProxy] Inlong-DataProxy support authentication access Pulsar (#2262)
a24256f is described below
commit a24256fdaabc100a3a66551dc148cabb12eb17e1
Author: baomingyu <ba...@163.com>
AuthorDate: Sun Jan 23 10:04:26 2022 +0800
[INLONG-2218][DataProxy] Inlong-DataProxy support authentication access Pulsar (#2262)
* add pulsar client auth config in dataproxy module
* modify example for pulsar auth config
* modify pulsar token default value
* change code style
---
.../conf/flume-mulit-pulsar-http-example.conf | 20 ++++++++++++----
.../conf/flume-mulit-pulsar-tcp-example.conf | 15 +++++++++++-
.../conf/flume-mulit-pulsar-udp-example.conf | 28 +++++++++++++++-------
.../dataproxy/sink/pulsar/PulsarClientService.java | 22 +++++++++++++----
4 files changed, 67 insertions(+), 18 deletions(-)
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
index 09ce407..b637090 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
@@ -78,6 +78,9 @@ agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg1.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg1.pulsar_token =
agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,11 +88,14 @@ agent1.sinks.pulsar-sink-msg1.client-id-cache = true
agent1.sinks.pulsar-sink-msg1.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg1.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg1.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg2.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg2.pulsar_token =
agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -97,11 +103,14 @@ agent1.sinks.pulsar-sink-msg2.client-id-cache = true
agent1.sinks.pulsar-sink-msg2.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg2.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg2.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg5.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg5.pulsar_token =
agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -109,11 +118,14 @@ agent1.sinks.pulsar-sink-msg5.client-id-cache = true
agent1.sinks.pulsar-sink-msg5.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg5.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg5.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg6.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg6.pulsar_token =
agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg6.thread-num = 8
@@ -121,4 +133,4 @@ agent1.sinks.pulsar-sink-msg6.client-id-cache = true
agent1.sinks.pulsar-sink-msg6.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg6.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg6.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec=20000000
\ No newline at end of file
+agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec = 20000000
\ No newline at end of file
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
index 3658f6c..78fb34a 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
@@ -79,6 +79,10 @@ agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg1.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg1.pulsar_token =
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -86,11 +90,14 @@ agent1.sinks.pulsar-sink-msg1.client-id-cache = true
agent1.sinks.pulsar-sink-msg1.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg1.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg1.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg2.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg2.pulsar_token =
agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -103,6 +110,9 @@ agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg5.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg5.pulsar_token =
agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -115,6 +125,9 @@ agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg6.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg6.pulsar_token =
agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg6.thread-num = 8
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
index c2c688d..6389395 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
@@ -42,10 +42,10 @@ agent1.sources.upd-source.sendBufferSize = 1048576
agent1.sources.upd-source.custom-cp = true
agent1.sources.upd-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
agent1.sources.upd-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.upd-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.upd-source.metric-agent-port=8003
-agent1.sources.upd-source.metric-cache-size=1000000
-agent1.sources.upd-source.set=10
+agent1.sources.upd-source.metric-recovery-path = /data/DataProxy/file/recovery
+agent1.sources.upd-source.metric-agent-port = 8003
+agent1.sources.upd-source.metric-cache-size = 1000000
+agent1.sources.upd-source.set = 10
agent1.channels.ch-msg1.type = memory
agent1.channels.ch-msg1.capacity = 10000
@@ -79,6 +79,9 @@ agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg1.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg1.pulsar_token =
agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -86,11 +89,14 @@ agent1.sinks.pulsar-sink-msg1.client-id-cache = true
agent1.sinks.pulsar-sink-msg1.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg1.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg1.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg2.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg2.pulsar_token =
agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -98,11 +104,14 @@ agent1.sinks.pulsar-sink-msg2.client-id-cache = true
agent1.sinks.pulsar-sink-msg2.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg2.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg2.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg5.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg5.pulsar_token =
agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -110,11 +119,14 @@ agent1.sinks.pulsar-sink-msg5.client-id-cache = true
agent1.sinks.pulsar-sink-msg5.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg5.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg5.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
+# Optional pulsar auth type and token config
+# agent1.sinks.pulsar-sink-msg6.pulsar_auth_type = token
+# agent1.sinks.pulsar-sink-msg6.pulsar_token =
agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg6.thread-num = 8
@@ -122,4 +134,4 @@ agent1.sinks.pulsar-sink-msg6.client-id-cache = true
agent1.sinks.pulsar-sink-msg6.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg6.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg6.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec=20000000
\ No newline at end of file
+agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec = 20000000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 83b57a0..d1155cc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
@@ -33,6 +34,8 @@ import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.EventStat;
import org.apache.inlong.commons.monitor.LogCounter;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -50,6 +53,9 @@ public class PulsarClientService {
* properties key for pulsar client
*/
private static String PULSAR_SERVER_URL_LIST = "pulsar_server_url_list";
+ private static String PULSAR_TOKEN = "pulsar_token";
+ private static String PULSAR_AUTH_TYPE = "pulsar_auth_type";
+ private static String PULSAR_DEFAULT_AUTH_TYPE = "token";
/*
* properties key pulsar producer
@@ -74,7 +80,8 @@ public class PulsarClientService {
* for pulsar client
*/
private String[] pulsarServerUrls;
-
+ private String token;
+ private String authType;
/*
* for producer
*/
@@ -100,6 +107,8 @@ public class PulsarClientService {
String pulsarServerUrlList = context.getString(PULSAR_SERVER_URL_LIST);
Preconditions.checkState(pulsarServerUrlList != null, "No pulsar server url specified");
pulsarServerUrls = pulsarServerUrlList.split("\\|");
+ token = context.getString(PULSAR_TOKEN);
+ authType = context.getString(PULSAR_AUTH_TYPE, PULSAR_DEFAULT_AUTH_TYPE);
Preconditions.checkState(pulsarServerUrls != null && pulsarServerUrls.length > 0, "No "
+ "pulsar server url config");
sendTimeout = context.getInteger(SEND_TIMEOUT, DEFAULT_SEND_TIMEOUT_MILL);
@@ -228,10 +237,13 @@ public class PulsarClientService {
}
private PulsarClient initPulsarClient(String pulsarUrl) throws Exception {
- return PulsarClient.builder()
- .serviceUrl(pulsarUrl)
- .connectionTimeout(clientTimeout, TimeUnit.SECONDS)
- .build();
+ ClientBuilder builder = PulsarClient.builder();
+ if (PULSAR_DEFAULT_AUTH_TYPE.equals(authType) && StringUtils.isNotEmpty(token)) {
+ builder.authentication(AuthenticationFactory.token(token));
+ }
+ builder.serviceUrl(pulsarUrl)
+ .connectionTimeout(clientTimeout, TimeUnit.SECONDS);
+ return builder.build();
}
public List<TopicProducerInfo> initTopicProducer(String topic) {