You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/12 17:07:59 UTC

[pinot] branch master updated: Handle authentication in pulsar pinot connector (#8338)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 348353be88 Handle authentication in pulsar pinot connector (#8338)
348353be88 is described below

commit 348353be88b6f1864bff5d474ecc9f9c254a9eda
Author: mathieudruart <ma...@users.noreply.github.com>
AuthorDate: Tue Apr 12 13:07:52 2022 -0400

    Handle authentication in pulsar pinot connector (#8338)
---
 .../pinot/plugin/stream/pulsar/PulsarConfig.java      | 19 +++++++++++++++++++
 .../pulsar/PulsarPartitionLevelConnectionHandler.java | 15 ++++++++++++++-
 .../pulsar/PulsarStreamLevelConsumerManager.java      | 17 ++++++++++++++++-
 3 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 2ce2c7551a..78db1b766d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -34,12 +34,16 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 public class PulsarConfig {
   public static final String STREAM_TYPE = "pulsar";
   public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+  public static final String AUTHENTICATION_TOKEN = "authenticationToken";
+  public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
 
   private String _pulsarTopicName;
   private String _subscriberId;
   private String _bootstrapServers;
   private MessageId _initialMessageId;
   private SubscriptionInitialPosition _subscriptionInitialPosition;
+  private String _authenticationToken;
+  private String _tlsTrustCertsFilePath;
 
   public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
@@ -48,6 +52,13 @@ public class PulsarConfig {
         streamConfigMap.get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, BOOTSTRAP_SERVERS));
     _subscriberId = subscriberId;
 
+    String authenticationTokenKey = StreamConfigProperties.constructStreamProperty(STREAM_TYPE, AUTHENTICATION_TOKEN);
+    _authenticationToken = streamConfigMap.get(authenticationTokenKey);
+
+    String tlsTrustCertsFilePathKey = StreamConfigProperties.
+            constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
+    _tlsTrustCertsFilePath = streamConfigMap.get(tlsTrustCertsFilePathKey);
+
     Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the config");
 
     OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();
@@ -75,4 +86,12 @@ public class PulsarConfig {
   public SubscriptionInitialPosition getInitialSubscriberPosition() {
    return _subscriptionInitialPosition;
   }
+
+  public String getAuthenticationToken() {
+    return _authenticationToken;
+  }
+
+  public String getTlsTrustCertsFilePath() {
+    return _tlsTrustCertsFilePath;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
index 47dbd97ed4..2aaf9d5694 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
@@ -21,6 +21,9 @@ package org.apache.pinot.plugin.stream.pulsar;
 import java.io.IOException;
 import java.util.List;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.slf4j.Logger;
@@ -50,7 +53,17 @@ public class PulsarPartitionLevelConnectionHandler {
     _topic = _config.getPulsarTopicName();
 
     try {
-      _pulsarClient = PulsarClient.builder().serviceUrl(_config.getBootstrapServers()).build();
+      ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
+      if (_config.getTlsTrustCertsFilePath() != null) {
+        pulsarClientBuilder.tlsTrustCertsFilePath(_config.getTlsTrustCertsFilePath());
+      }
+
+      if (_config.getAuthenticationToken() != null) {
+        Authentication authentication = AuthenticationFactory.token(_config.getAuthenticationToken());
+        pulsarClientBuilder.authentication(authentication);
+      }
+
+      _pulsarClient = pulsarClientBuilder.build();
 
       _reader = _pulsarClient.newReader().topic(getPartitionedTopicName(partition))
           .startMessageId(_config.getInitialMessageId()).startMessageIdInclusive().create();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
index 325ebccef4..ff23a0cea6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
@@ -25,6 +25,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
@@ -75,7 +78,19 @@ public class PulsarStreamLevelConsumerManager {
 
       // Create the consumer
       try {
-        _pulsarClient = PulsarClient.builder().serviceUrl(pulsarStreamLevelStreamConfig.getBootstrapServers()).build();
+        ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(
+                pulsarStreamLevelStreamConfig.getBootstrapServers());
+        if (pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath() != null) {
+          pulsarClientBuilder.tlsTrustCertsFilePath(pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath());
+        }
+
+        if (pulsarStreamLevelStreamConfig.getAuthenticationToken() != null) {
+          Authentication authentication = AuthenticationFactory.token(
+                  pulsarStreamLevelStreamConfig.getAuthenticationToken());
+          pulsarClientBuilder.authentication(authentication);
+        }
+
+        _pulsarClient = pulsarClientBuilder.build();
 
         _reader = _pulsarClient.newReader().topic(pulsarStreamLevelStreamConfig.getPulsarTopicName())
             .startMessageId(pulsarStreamLevelStreamConfig.getInitialMessageId()).create();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org