You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/07 09:58:04 UTC
[incubator-inlong] branch master updated: [INLONG-4520][Audit] Audit-proxy supports Pulsar authenticate (#4522)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a0657a010 [INLONG-4520][Audit] Audit-proxy supports Pulsar authenticate (#4522)
a0657a010 is described below
commit a0657a010f4e5abae6ac69faa5c5cd98d1c3a83b
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Tue Jun 7 17:57:59 2022 +0800
[INLONG-4520][Audit] Audit-proxy supports Pulsar authenticate (#4522)
Co-authored-by: doleyzi <do...@tencent.com>
---
.../audit/sink/pulsar/PulsarClientService.java | 48 +++++++++++++++-------
inlong-audit/conf/audit-proxy-pulsar.conf | 4 ++
2 files changed, 38 insertions(+), 14 deletions(-)
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
index efbd82156..73b2f627f 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
@@ -18,10 +18,13 @@
package org.apache.inlong.audit.sink.pulsar;
import com.google.common.base.Preconditions;
+
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
@@ -29,6 +32,8 @@ import org.apache.inlong.audit.consts.AttributeConstants;
import org.apache.inlong.audit.sink.EventStat;
import org.apache.inlong.audit.utils.LogCounter;
import org.apache.inlong.audit.utils.NetworkUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -46,7 +51,8 @@ public class PulsarClientService {
* properties key for pulsar client
*/
private static String PULSAR_SERVER_URL = "pulsar_server_url";
-
+ private static String PULSAR_ENABLE_AUTH = "enable_token_auth";
+ private static String PULSAR_ENABLE_AUTH_TOKEN = "auth_token";
/*
* properties key pulsar producer
*/
@@ -64,6 +70,8 @@ public class PulsarClientService {
private static int DEFAULT_MAX_PENDING_MESSAGES = 10000;
private static int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
+ private static boolean DEFAULT_PULSAR_ENABLE_TOKEN_AUTH = false;
+ private static String DEFAULT_PULSAR_TOKEN_AUTH = "";
/*
* for producer
*/
@@ -76,11 +84,14 @@ public class PulsarClientService {
public ConcurrentHashMap<String, Producer> producerInfoMap;
public PulsarClient pulsarClient;
public String pulsarServerUrl;
+ public boolean pulsarEnableTokenAuth;
+ public String pulsarTokenAuth;
private String localIp = "127.0.0.1";
/**
* pulsar client service
+ *
* @param context
*/
public PulsarClientService(Context context) {
@@ -96,14 +107,17 @@ public class PulsarClientService {
enableBatch = context.getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL, DEFAULT_BLOCK_IF_QUEUE_FULL);
maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES, DEFAULT_MAX_PENDING_MESSAGES);
- maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES, DEFAULT_MAX_BATCHING_MESSAGES);
+ maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES, DEFAULT_MAX_BATCHING_MESSAGES);
producerInfoMap = new ConcurrentHashMap<>();
localIp = NetworkUtils.getLocalIp();
+ pulsarEnableTokenAuth = context.getBoolean(PULSAR_ENABLE_AUTH, DEFAULT_PULSAR_ENABLE_TOKEN_AUTH);
+ pulsarTokenAuth = context.getString(PULSAR_ENABLE_AUTH_TOKEN, DEFAULT_PULSAR_TOKEN_AUTH);
}
/**
* init connection
+ *
* @param callBack
*/
public void initCreateConnection(CreatePulsarClientCallBack callBack) {
@@ -117,6 +131,7 @@ public class PulsarClientService {
/**
* send message
+ *
* @param topic
* @param event
* @param sendMessageCallBack
@@ -124,7 +139,7 @@ public class PulsarClientService {
* @return
*/
public boolean sendMessage(String topic, Event event,
- SendMessageCallBack sendMessageCallBack, EventStat es) {
+ SendMessageCallBack sendMessageCallBack, EventStat es) {
Producer producer = null;
try {
producer = getProducer(topic);
@@ -155,12 +170,12 @@ public class PulsarClientService {
logger.debug("producer send msg!");
producer.newMessage().properties(proMap).value(event.getBody())
.sendAsync().thenAccept((msgId) -> {
- sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl)msgId, es);
+ sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl) msgId, es);
- }).exceptionally((e) -> {
- sendMessageCallBack.handleMessageSendException(es, e);
- return null;
- });
+ }).exceptionally((e) -> {
+ sendMessageCallBack.handleMessageSendException(es, e);
+ return null;
+ });
return true;
}
@@ -194,10 +209,15 @@ public class PulsarClientService {
}
private PulsarClient initPulsarClient(String pulsarUrl) throws Exception {
- return PulsarClient.builder()
- .serviceUrl(pulsarUrl)
- .connectionTimeout(clientOpTimeout, TimeUnit.SECONDS)
- .build();
+ PulsarClient pulsarClient = null;
+ ClientBuilder builder = PulsarClient.builder();
+ if (pulsarEnableTokenAuth && StringUtils.isNotEmpty(pulsarTokenAuth)) {
+ builder.authentication(AuthenticationFactory.token(pulsarTokenAuth));
+ }
+ pulsarClient = builder.serviceUrl(pulsarUrl)
+ .connectionTimeout(clientOpTimeout, TimeUnit.SECONDS).build();
+
+ return pulsarClient;
}
public Producer initTopicProducer(String topic) {
@@ -205,7 +225,7 @@ public class PulsarClientService {
Producer producer = null;
try {
producer = pulsarClient.newProducer().sendTimeout(sendTimeout,
- TimeUnit.MILLISECONDS)
+ TimeUnit.MILLISECONDS)
.topic(topic)
.enableBatching(enableBatch)
.blockIfQueueFull(blockIfQueueFull)
@@ -224,7 +244,7 @@ public class PulsarClientService {
public void closeTopicProducer(String topic) {
logger.info("closeTopicProducer topic = {}", topic);
- Producer producer = producerInfoMap.remove(topic);
+ Producer producer = producerInfoMap.remove(topic);
if (producer != null) {
producer.closeAsync();
}
diff --git a/inlong-audit/conf/audit-proxy-pulsar.conf b/inlong-audit/conf/audit-proxy-pulsar.conf
index 05a6d7bf5..aea1ab5ba 100644
--- a/inlong-audit/conf/audit-proxy-pulsar.conf
+++ b/inlong-audit/conf/audit-proxy-pulsar.conf
@@ -61,6 +61,8 @@ agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.audit.sink.PulsarSink
agent1.sinks.pulsar-sink-msg1.pulsar_server_url = pulsar://localhost:6650
agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/inlong-audit
+agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
+agent1.sinks.pulsar-sink-msg1.auth_token =
agent1.sinks.pulsar-sink-msg1.send_timeout_ms = 30000
agent1.sinks.pulsar-sink-msg1.client_op_timeout_second = 30000
agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
@@ -77,6 +79,8 @@ agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.audit.sink.PulsarSink
agent1.sinks.pulsar-sink-msg2.pulsar_server_url = pulsar://localhost:6650
agent1.sinks.pulsar-sink-msg2.topic = persistent://public/default/inlong-audit
+agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
+agent1.sinks.pulsar-sink-msg1.auth_token =
agent1.sinks.pulsar-sink-msg2.send_timeout_ms = 30000
agent1.sinks.pulsar-sink-msg2.client_op_timeout_second = 30000
agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60