You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/12/18 09:39:58 UTC
(pinot) branch master updated: [feature] add support for StreamNative OAuth2 authentication for pulsar. (#12068)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fd45c1bbd2 [feature] add support for StreamNative OAuth2 authentication for pulsar. (#12068)
fd45c1bbd2 is described below
commit fd45c1bbd273dab29aebe4576cf0475d219e2fca
Author: Jeffrey Bolle <Je...@users.noreply.github.com>
AuthorDate: Mon Dec 18 04:39:52 2023 -0500
[feature] add support for StreamNative OAuth2 authentication for pulsar. (#12068)
* [feature] add support for StreamNative OAuth2 authentication for pulsar.
Addresses #12067
* add comments documenting new params.
* Add validation for provided OAuth2 config.
* checkstyle
* Added todo per PR comments.
---
.../pinot/plugin/stream/pulsar/PulsarConfig.java | 87 +++++++++++++++++-----
.../PulsarPartitionLevelConnectionHandler.java | 26 +++++++
.../plugin/stream/pulsar/PulsarConfigTest.java | 54 ++++++++++++++
3 files changed, 148 insertions(+), 19 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 8fefc0e7c5..8cdc8f8647 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -19,6 +19,9 @@
package org.apache.pinot.plugin.stream.pulsar;
import com.google.common.base.Preconditions;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -42,6 +45,10 @@ public class PulsarConfig {
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
public static final String AUTHENTICATION_TOKEN = "authenticationToken";
public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
+
+ public static final String OAUTH_ISSUER_URL = "issuerUrl";
+ public static final String OAUTH_CREDS_FILE_PATH = "credsFilePath";
+ public static final String OAUTH_AUDIENCE = "audience";
public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";
public static final String METADATA_FIELDS = "metadata.fields"; //list of the metadata fields comma separated
@@ -52,6 +59,15 @@ public class PulsarConfig {
private final SubscriptionInitialPosition _subscriptionInitialPosition;
private final String _authenticationToken;
private final String _tlsTrustCertsFilePath;
+
+ private final String _issuerUrl; // OAUTH2 issuer URL example: "https://auth.streamnative.cloud"
+
+ // Absolute path of your downloaded key file on the local file system.
+ // example: file:///path/to/private_creds_file
+ //TODO: find a good way to support pushing this secret to all servers.
+ private final String _credentialsFilePath;
+ private final String _audience; // Audience for your OAUTH2 client: urn:sn:pulsar:test:test-cluster
+
// Deprecated since pulsar supports record key extraction
@Deprecated
private final boolean _enableKeyValueStitch;
@@ -59,38 +75,59 @@ public class PulsarConfig {
private final Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> _metadataFields;
public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
- _pulsarTopicName = streamConfig.getTopicName();
- _bootstrapServers =
- streamConfigMap.get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, BOOTSTRAP_SERVERS));
_subscriberId = subscriberId;
- String authenticationTokenKey = StreamConfigProperties.constructStreamProperty(STREAM_TYPE, AUTHENTICATION_TOKEN);
- _authenticationToken = streamConfigMap.get(authenticationTokenKey);
-
- String tlsTrustCertsFilePathKey = StreamConfigProperties.
- constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
- _tlsTrustCertsFilePath = streamConfigMap.get(tlsTrustCertsFilePathKey);
-
- String enableKeyValueStitchKey = StreamConfigProperties.
- constructStreamProperty(STREAM_TYPE, ENABLE_KEY_VALUE_STITCH);
- _enableKeyValueStitch = Boolean.parseBoolean(streamConfigMap.get(enableKeyValueStitchKey));
-
+ _pulsarTopicName = streamConfig.getTopicName();
+ _bootstrapServers = getConfigValue(streamConfigMap, BOOTSTRAP_SERVERS);
Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the config");
+ _authenticationToken = getConfigValue(streamConfigMap, AUTHENTICATION_TOKEN);
+ _tlsTrustCertsFilePath = getConfigValue(streamConfigMap, TLS_TRUST_CERTS_FILE_PATH);
+ _enableKeyValueStitch = Boolean.parseBoolean(getConfigValue(streamConfigMap, ENABLE_KEY_VALUE_STITCH));
+
OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();
_subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(offsetCriteria);
_initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria);
- _populateMetadata = Boolean.parseBoolean(streamConfig.getStreamConfigsMap().getOrDefault(
- StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
- "false"));
- String metadataFieldsToExtractCSV = streamConfig.getStreamConfigsMap().getOrDefault(
- StreamConfigProperties.constructStreamProperty(STREAM_TYPE, METADATA_FIELDS), "");
+ _populateMetadata = Boolean.parseBoolean(getConfigValueOrDefault(streamConfigMap,
+ StreamConfigProperties.METADATA_POPULATE, "false"));
+ String metadataFieldsToExtractCSV = getConfigValueOrDefault(streamConfigMap, METADATA_FIELDS, "");
if (StringUtils.isBlank(metadataFieldsToExtractCSV) || !_populateMetadata) {
_metadataFields = Collections.emptySet();
} else {
_metadataFields = parseConfigStringToEnumSet(metadataFieldsToExtractCSV);
}
+ _issuerUrl = getConfigValue(streamConfigMap, OAUTH_ISSUER_URL);
+ _credentialsFilePath = getConfigValue(streamConfigMap, OAUTH_CREDS_FILE_PATH);
+ if (StringUtils.isNotBlank(_credentialsFilePath)) {
+ validateOAuthCredFile();
+ }
+ _audience = getConfigValue(streamConfigMap, OAUTH_AUDIENCE);
+ }
+
+ protected void validateOAuthCredFile() {
+ try {
+ URL credFilePathUrl = new URL(_credentialsFilePath);
+ if (!"file".equals(credFilePathUrl.getProtocol())) {
+ throw new IllegalArgumentException("Invalid credentials file path: " + _credentialsFilePath
+ + ". URL protocol must be file://");
+ }
+ File credFile = new File(credFilePathUrl.getPath());
+ if (!credFile.exists()) {
+ throw new IllegalArgumentException("Invalid credentials file path: " + _credentialsFilePath
+ + ". File does not exist.");
+ }
+ } catch (MalformedURLException mue) {
+ throw new IllegalArgumentException("Invalid credentials file path: " + _credentialsFilePath, mue);
+ }
+ }
+
+ private String getConfigValue(Map<String, String> streamConfigMap, String key) {
+ return streamConfigMap.get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, key));
+ }
+
+ private String getConfigValueOrDefault(Map<String, String> streamConfigMap, String key, String defaultValue) {
+ return streamConfigMap.getOrDefault(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, key), defaultValue);
}
private Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> parseConfigStringToEnumSet(
@@ -144,4 +181,16 @@ public class PulsarConfig {
public Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> getMetadataFields() {
return _metadataFields;
}
+
+ public String getIssuerUrl() {
+ return _issuerUrl;
+ }
+
+ public String getCredentialsFilePath() {
+ return _credentialsFilePath;
+ }
+
+ public String getAudience() {
+ return _audience;
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
index 11033ec716..163400bc3b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
@@ -19,7 +19,11 @@
package org.apache.pinot.plugin.stream.pulsar;
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.List;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -27,6 +31,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +65,7 @@ public class PulsarPartitionLevelConnectionHandler {
pulsarClientBuilder.authentication(authentication);
}
+ getAuthenticationFactory(_config).ifPresent(pulsarClientBuilder::authentication);
_pulsarClient = pulsarClientBuilder.build();
LOGGER.info("Created pulsar client {}", _pulsarClient);
} catch (Exception e) {
@@ -67,6 +73,26 @@ public class PulsarPartitionLevelConnectionHandler {
}
}
+ private Optional<Authentication> getAuthenticationFactory(PulsarConfig pulsarConfig) {
+ if (StringUtils.isNotBlank(pulsarConfig.getIssuerUrl())
+ && StringUtils.isBlank(pulsarConfig.getAudience())
+ && StringUtils.isBlank(pulsarConfig.getCredentialsFilePath())) {
+ try {
+ return Optional.of(AuthenticationFactoryOAuth2.clientCredentials(
+ new URL(pulsarConfig.getIssuerUrl()),
+ new URL(pulsarConfig.getCredentialsFilePath()),
+ pulsarConfig.getAudience()));
+ } catch (MalformedURLException mue) {
+ LOGGER.error("Failed to create authentication factory for pulsar client with config: "
+ + "issuer: {}, credential file path: {}, audience: {}",
+ pulsarConfig.getIssuerUrl(),
+ pulsarConfig.getCredentialsFilePath(),
+ pulsarConfig.getAudience(),
+ mue);
+ }
+ }
+ return Optional.empty();
+ }
protected Reader<byte[]> createReaderForPartition(String topic, int partition, MessageId initialMessageId) {
if (_pulsarClient == null) {
throw new RuntimeException("Failed to create reader as no pulsar client found for topic " + topic);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
index badbbbc5f7..3c7eeffe80 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
@@ -19,8 +19,12 @@
package org.apache.pinot.plugin.stream.pulsar;
import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -113,4 +117,54 @@ public class PulsarConfigTest {
Assert.assertFalse(pulsarConfig.isPopulateMetadata());
Assert.assertEquals(metadataFieldsToExtract.size(), 0);
}
+
+ @Test
+ public void testParsingConfigForOAuth() throws Exception {
+ Path testFile = null;
+ try {
+ testFile = Files.createTempFile("test_cred_file", ".json");
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_ISSUER_URL),
+ "http://auth.test.com");
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ PulsarConfig.OAUTH_CREDS_FILE_PATH), "file://" + testFile.toFile().getAbsolutePath());
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_AUDIENCE),
+ "urn:test:test");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Assert.assertEquals(pulsarConfig.getIssuerUrl(), "http://auth.test.com");
+ Assert.assertEquals(pulsarConfig.getCredentialsFilePath(),
+ "file://" + testFile.toFile().getAbsolutePath());
+ Assert.assertEquals(pulsarConfig.getAudience(), "urn:test:test");
+ } catch (Exception e) {
+ Assert.fail("Should not throw exception", e);
+ } finally {
+ Optional.ofNullable(testFile).map(Path::toFile).ifPresent(File::delete);
+ }
+ }
+
+ @Test
+ public void testParsingConfigFailFileValidationForOAuth() throws Exception {
+ String testFilePath = "file://path/to/file.json";
+
+ try {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_ISSUER_URL),
+ "http://auth.test.com");
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ PulsarConfig.OAUTH_CREDS_FILE_PATH),
+ testFilePath);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_AUDIENCE),
+ "urn:test:test");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId"); //will throw exception
+ } catch (IllegalArgumentException mue) {
+ //expected case.
+ String errorMessage = String.format("Invalid credentials file path: %s. File does not exist.", testFilePath);
+ Assert.assertEquals(errorMessage, mue.getMessage());
+ } catch (Exception e) {
+ Assert.fail("Should not throw other exception", e);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org