You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/12 17:07:59 UTC
[pinot] branch master updated: Handle authentication in pulsar pinot connector (#8338)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 348353be88 Handle authentication in pulsar pinot connector (#8338)
348353be88 is described below
commit 348353be88b6f1864bff5d474ecc9f9c254a9eda
Author: mathieudruart <ma...@users.noreply.github.com>
AuthorDate: Tue Apr 12 13:07:52 2022 -0400
Handle authentication in pulsar pinot connector (#8338)
---
.../pinot/plugin/stream/pulsar/PulsarConfig.java | 19 +++++++++++++++++++
.../pulsar/PulsarPartitionLevelConnectionHandler.java | 15 ++++++++++++++-
.../pulsar/PulsarStreamLevelConsumerManager.java | 17 ++++++++++++++++-
3 files changed, 49 insertions(+), 2 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 2ce2c7551a..78db1b766d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -34,12 +34,16 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
public class PulsarConfig {
public static final String STREAM_TYPE = "pulsar";
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+ public static final String AUTHENTICATION_TOKEN = "authenticationToken";
+ public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
private String _pulsarTopicName;
private String _subscriberId;
private String _bootstrapServers;
private MessageId _initialMessageId;
private SubscriptionInitialPosition _subscriptionInitialPosition;
+ private String _authenticationToken;
+ private String _tlsTrustCertsFilePath;
public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
@@ -48,6 +52,13 @@ public class PulsarConfig {
streamConfigMap.get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, BOOTSTRAP_SERVERS));
_subscriberId = subscriberId;
+ String authenticationTokenKey = StreamConfigProperties.constructStreamProperty(STREAM_TYPE, AUTHENTICATION_TOKEN);
+ _authenticationToken = streamConfigMap.get(authenticationTokenKey);
+
+ String tlsTrustCertsFilePathKey = StreamConfigProperties.
+ constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
+ _tlsTrustCertsFilePath = streamConfigMap.get(tlsTrustCertsFilePathKey);
+
Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the config");
OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();
@@ -75,4 +86,12 @@ public class PulsarConfig {
public SubscriptionInitialPosition getInitialSubscriberPosition() {
return _subscriptionInitialPosition;
}
+
+ public String getAuthenticationToken() {
+ return _authenticationToken;
+ }
+
+ public String getTlsTrustCertsFilePath() {
+ return _tlsTrustCertsFilePath;
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
index 47dbd97ed4..2aaf9d5694 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
@@ -21,6 +21,9 @@ package org.apache.pinot.plugin.stream.pulsar;
import java.io.IOException;
import java.util.List;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.slf4j.Logger;
@@ -50,7 +53,17 @@ public class PulsarPartitionLevelConnectionHandler {
_topic = _config.getPulsarTopicName();
try {
- _pulsarClient = PulsarClient.builder().serviceUrl(_config.getBootstrapServers()).build();
+ ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
+ if (_config.getTlsTrustCertsFilePath() != null) {
+ pulsarClientBuilder.tlsTrustCertsFilePath(_config.getTlsTrustCertsFilePath());
+ }
+
+ if (_config.getAuthenticationToken() != null) {
+ Authentication authentication = AuthenticationFactory.token(_config.getAuthenticationToken());
+ pulsarClientBuilder.authentication(authentication);
+ }
+
+ _pulsarClient = pulsarClientBuilder.build();
_reader = _pulsarClient.newReader().topic(getPartitionedTopicName(partition))
.startMessageId(_config.getInitialMessageId()).startMessageIdInclusive().create();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
index 325ebccef4..ff23a0cea6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java
@@ -25,6 +25,9 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
@@ -75,7 +78,19 @@ public class PulsarStreamLevelConsumerManager {
// Create the consumer
try {
- _pulsarClient = PulsarClient.builder().serviceUrl(pulsarStreamLevelStreamConfig.getBootstrapServers()).build();
+ ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(
+ pulsarStreamLevelStreamConfig.getBootstrapServers());
+ if (pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath() != null) {
+ pulsarClientBuilder.tlsTrustCertsFilePath(pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath());
+ }
+
+ if (pulsarStreamLevelStreamConfig.getAuthenticationToken() != null) {
+ Authentication authentication = AuthenticationFactory.token(
+ pulsarStreamLevelStreamConfig.getAuthenticationToken());
+ pulsarClientBuilder.authentication(authentication);
+ }
+
+ _pulsarClient = pulsarClientBuilder.build();
_reader = _pulsarClient.newReader().topic(pulsarStreamLevelStreamConfig.getPulsarTopicName())
.startMessageId(pulsarStreamLevelStreamConfig.getInitialMessageId()).create();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org