You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/07 09:58:04 UTC

[incubator-inlong] branch master updated: [INLONG-4520][Audit] Audit-proxy supports Pulsar authenticate (#4522)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a0657a010 [INLONG-4520][Audit] Audit-proxy supports Pulsar authenticate (#4522)
a0657a010 is described below

commit a0657a010f4e5abae6ac69faa5c5cd98d1c3a83b
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Tue Jun 7 17:57:59 2022 +0800

    [INLONG-4520][Audit] Audit-proxy supports Pulsar authenticate (#4522)
    
    Co-authored-by: doleyzi <do...@tencent.com>
---
 .../audit/sink/pulsar/PulsarClientService.java     | 48 +++++++++++++++-------
 inlong-audit/conf/audit-proxy-pulsar.conf          |  4 ++
 2 files changed, 38 insertions(+), 14 deletions(-)

diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
index efbd82156..73b2f627f 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
@@ -18,10 +18,13 @@
 package org.apache.inlong.audit.sink.pulsar;
 
 import com.google.common.base.Preconditions;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -29,6 +32,8 @@ import org.apache.inlong.audit.consts.AttributeConstants;
 import org.apache.inlong.audit.sink.EventStat;
 import org.apache.inlong.audit.utils.LogCounter;
 import org.apache.inlong.audit.utils.NetworkUtils;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -46,7 +51,8 @@ public class PulsarClientService {
      * properties key for pulsar client
      */
     private static String PULSAR_SERVER_URL = "pulsar_server_url";
-
+    private static String PULSAR_ENABLE_AUTH = "enable_token_auth";
+    private static String PULSAR_ENABLE_AUTH_TOKEN = "auth_token";
     /*
      * properties key pulsar producer
      */
@@ -64,6 +70,8 @@ public class PulsarClientService {
     private static int DEFAULT_MAX_PENDING_MESSAGES = 10000;
     private static int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
 
+    private static boolean DEFAULT_PULSAR_ENABLE_TOKEN_AUTH = false;
+    private static String DEFAULT_PULSAR_TOKEN_AUTH = "";
     /*
      * for producer
      */
@@ -76,11 +84,14 @@ public class PulsarClientService {
     public ConcurrentHashMap<String, Producer> producerInfoMap;
     public PulsarClient pulsarClient;
     public String pulsarServerUrl;
+    public boolean pulsarEnableTokenAuth;
+    public String pulsarTokenAuth;
 
     private String localIp = "127.0.0.1";
 
     /**
      * pulsar client service
+     *
      * @param context
      */
     public PulsarClientService(Context context) {
@@ -96,14 +107,17 @@ public class PulsarClientService {
         enableBatch = context.getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
         blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL, DEFAULT_BLOCK_IF_QUEUE_FULL);
         maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES, DEFAULT_MAX_PENDING_MESSAGES);
-        maxBatchingMessages =  context.getInteger(MAX_BATCHING_MESSAGES, DEFAULT_MAX_BATCHING_MESSAGES);
+        maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES, DEFAULT_MAX_BATCHING_MESSAGES);
         producerInfoMap = new ConcurrentHashMap<>();
         localIp = NetworkUtils.getLocalIp();
 
+        pulsarEnableTokenAuth = context.getBoolean(PULSAR_ENABLE_AUTH, DEFAULT_PULSAR_ENABLE_TOKEN_AUTH);
+        pulsarTokenAuth = context.getString(PULSAR_ENABLE_AUTH_TOKEN, DEFAULT_PULSAR_TOKEN_AUTH);
     }
 
     /**
      * init connection
+     *
      * @param callBack
      */
     public void initCreateConnection(CreatePulsarClientCallBack callBack) {
@@ -117,6 +131,7 @@ public class PulsarClientService {
 
     /**
      * send message
+     *
      * @param topic
      * @param event
      * @param sendMessageCallBack
@@ -124,7 +139,7 @@ public class PulsarClientService {
      * @return
      */
     public boolean sendMessage(String topic, Event event,
-            SendMessageCallBack sendMessageCallBack, EventStat es) {
+                               SendMessageCallBack sendMessageCallBack, EventStat es) {
         Producer producer = null;
         try {
             producer = getProducer(topic);
@@ -155,12 +170,12 @@ public class PulsarClientService {
         logger.debug("producer send msg!");
         producer.newMessage().properties(proMap).value(event.getBody())
                 .sendAsync().thenAccept((msgId) -> {
-            sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl)msgId, es);
+                    sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl) msgId, es);
 
-        }).exceptionally((e) -> {
-            sendMessageCallBack.handleMessageSendException(es, e);
-            return null;
-        });
+                }).exceptionally((e) -> {
+                    sendMessageCallBack.handleMessageSendException(es, e);
+                    return null;
+                });
         return true;
     }
 
@@ -194,10 +209,15 @@ public class PulsarClientService {
     }
 
     private PulsarClient initPulsarClient(String pulsarUrl) throws Exception {
-        return PulsarClient.builder()
-                .serviceUrl(pulsarUrl)
-                .connectionTimeout(clientOpTimeout, TimeUnit.SECONDS)
-                .build();
+        PulsarClient pulsarClient = null;
+        ClientBuilder builder = PulsarClient.builder();
+        if (pulsarEnableTokenAuth && StringUtils.isNotEmpty(pulsarTokenAuth)) {
+            builder.authentication(AuthenticationFactory.token(pulsarTokenAuth));
+        }
+        pulsarClient = builder.serviceUrl(pulsarUrl)
+                .connectionTimeout(clientOpTimeout, TimeUnit.SECONDS).build();
+
+        return pulsarClient;
     }
 
     public Producer initTopicProducer(String topic) {
@@ -205,7 +225,7 @@ public class PulsarClientService {
         Producer producer = null;
         try {
             producer = pulsarClient.newProducer().sendTimeout(sendTimeout,
-                    TimeUnit.MILLISECONDS)
+                            TimeUnit.MILLISECONDS)
                     .topic(topic)
                     .enableBatching(enableBatch)
                     .blockIfQueueFull(blockIfQueueFull)
@@ -224,7 +244,7 @@ public class PulsarClientService {
 
     public void closeTopicProducer(String topic) {
         logger.info("closeTopicProducer topic = {}", topic);
-        Producer producer  = producerInfoMap.remove(topic);
+        Producer producer = producerInfoMap.remove(topic);
         if (producer != null) {
             producer.closeAsync();
         }
diff --git a/inlong-audit/conf/audit-proxy-pulsar.conf b/inlong-audit/conf/audit-proxy-pulsar.conf
index 05a6d7bf5..aea1ab5ba 100644
--- a/inlong-audit/conf/audit-proxy-pulsar.conf
+++ b/inlong-audit/conf/audit-proxy-pulsar.conf
@@ -61,6 +61,8 @@ agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.audit.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg1.pulsar_server_url = pulsar://localhost:6650
 agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/inlong-audit
+agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
+agent1.sinks.pulsar-sink-msg1.auth_token =
 agent1.sinks.pulsar-sink-msg1.send_timeout_ms = 30000
 agent1.sinks.pulsar-sink-msg1.client_op_timeout_second = 30000
 agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
@@ -77,6 +79,8 @@ agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.audit.sink.PulsarSink
 agent1.sinks.pulsar-sink-msg2.pulsar_server_url = pulsar://localhost:6650
 agent1.sinks.pulsar-sink-msg2.topic = persistent://public/default/inlong-audit
+agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
+agent1.sinks.pulsar-sink-msg1.auth_token =
 agent1.sinks.pulsar-sink-msg2.send_timeout_ms = 30000
 agent1.sinks.pulsar-sink-msg2.client_op_timeout_second = 30000
 agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60