You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/12/18 09:39:58 UTC

(pinot) branch master updated: [feature] add support for StreamNative OAuth2 authentication for pulsar. (#12068)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fd45c1bbd2 [feature] add support for StreamNative OAuth2 authentication for pulsar. (#12068)
fd45c1bbd2 is described below

commit fd45c1bbd273dab29aebe4576cf0475d219e2fca
Author: Jeffrey Bolle <Je...@users.noreply.github.com>
AuthorDate: Mon Dec 18 04:39:52 2023 -0500

    [feature] add support for StreamNative OAuth2 authentication for pulsar. (#12068)
    
    * [feature] add support for StreamNative OAuth2 authentication for pulsar.
    
    Addresses #12067
    
    * add comments documenting new params.
    
    * Add validation for provided OAuth2 config.
    
    * checkstyle
    
    * Added todo per PR comments.
---
 .../pinot/plugin/stream/pulsar/PulsarConfig.java   | 87 +++++++++++++++++-----
 .../PulsarPartitionLevelConnectionHandler.java     | 26 +++++++
 .../plugin/stream/pulsar/PulsarConfigTest.java     | 54 ++++++++++++++
 3 files changed, 148 insertions(+), 19 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 8fefc0e7c5..8cdc8f8647 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -19,6 +19,9 @@
 package org.apache.pinot.plugin.stream.pulsar;
 
 import com.google.common.base.Preconditions;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
@@ -42,6 +45,10 @@ public class PulsarConfig {
   public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
   public static final String AUTHENTICATION_TOKEN = "authenticationToken";
   public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
+
+  public static final String OAUTH_ISSUER_URL = "issuerUrl";
+  public static final String OAUTH_CREDS_FILE_PATH = "credsFilePath";
+  public static final String OAUTH_AUDIENCE = "audience";
   public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";
   public static final String METADATA_FIELDS = "metadata.fields"; //list of the metadata fields comma separated
 
@@ -52,6 +59,15 @@ public class PulsarConfig {
   private final SubscriptionInitialPosition _subscriptionInitialPosition;
   private final String _authenticationToken;
   private final String _tlsTrustCertsFilePath;
+
+  private final String _issuerUrl; // OAUTH2 issuer URL example: "https://auth.streamnative.cloud"
+
+  // Absolute path of your downloaded key file on the local file system.
+  // example: file:///path/to/private_creds_file
+  //TODO: find a good way to support pushing this secret to all servers.
+  private final String _credentialsFilePath;
+  private final String _audience; // Audience for your OAUTH2 client: urn:sn:pulsar:test:test-cluster
+
   // Deprecated since pulsar supports record key extraction
   @Deprecated
   private final boolean _enableKeyValueStitch;
@@ -59,38 +75,59 @@ public class PulsarConfig {
   private final Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> _metadataFields;
   public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
-    _pulsarTopicName = streamConfig.getTopicName();
-    _bootstrapServers =
-        streamConfigMap.get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, BOOTSTRAP_SERVERS));
     _subscriberId = subscriberId;
 
-    String authenticationTokenKey = StreamConfigProperties.constructStreamProperty(STREAM_TYPE, AUTHENTICATION_TOKEN);
-    _authenticationToken = streamConfigMap.get(authenticationTokenKey);
-
-    String tlsTrustCertsFilePathKey = StreamConfigProperties.
-        constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
-    _tlsTrustCertsFilePath = streamConfigMap.get(tlsTrustCertsFilePathKey);
-
-    String enableKeyValueStitchKey = StreamConfigProperties.
-        constructStreamProperty(STREAM_TYPE, ENABLE_KEY_VALUE_STITCH);
-    _enableKeyValueStitch = Boolean.parseBoolean(streamConfigMap.get(enableKeyValueStitchKey));
-
+    _pulsarTopicName = streamConfig.getTopicName();
+    _bootstrapServers = getConfigValue(streamConfigMap, BOOTSTRAP_SERVERS);
     Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the config");
 
+    _authenticationToken = getConfigValue(streamConfigMap, AUTHENTICATION_TOKEN);
+    _tlsTrustCertsFilePath = getConfigValue(streamConfigMap, TLS_TRUST_CERTS_FILE_PATH);
+    _enableKeyValueStitch = Boolean.parseBoolean(getConfigValue(streamConfigMap, ENABLE_KEY_VALUE_STITCH));
+
     OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();
 
     _subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(offsetCriteria);
     _initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria);
-    _populateMetadata = Boolean.parseBoolean(streamConfig.getStreamConfigsMap().getOrDefault(
-        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
-        "false"));
-    String metadataFieldsToExtractCSV = streamConfig.getStreamConfigsMap().getOrDefault(
-        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, METADATA_FIELDS), "");
+    _populateMetadata = Boolean.parseBoolean(getConfigValueOrDefault(streamConfigMap,
+        StreamConfigProperties.METADATA_POPULATE, "false"));
+    String metadataFieldsToExtractCSV = getConfigValueOrDefault(streamConfigMap, METADATA_FIELDS, "");
     if (StringUtils.isBlank(metadataFieldsToExtractCSV) || !_populateMetadata) {
       _metadataFields = Collections.emptySet();
     } else {
       _metadataFields = parseConfigStringToEnumSet(metadataFieldsToExtractCSV);
     }
+    _issuerUrl = getConfigValue(streamConfigMap, OAUTH_ISSUER_URL);
+    _credentialsFilePath = getConfigValue(streamConfigMap, OAUTH_CREDS_FILE_PATH);
+    if (StringUtils.isNotBlank(_credentialsFilePath)) {
+      validateOAuthCredFile();
+    }
+    _audience = getConfigValue(streamConfigMap, OAUTH_AUDIENCE);
+  }
+
+  protected void validateOAuthCredFile() {
+    try {
+      URL credFilePathUrl = new URL(_credentialsFilePath);
+      if (!"file".equals(credFilePathUrl.getProtocol())) {
+        throw new IllegalArgumentException("Invalid credentials file path: " + _credentialsFilePath
+            + ". URL protocol must be file://");
+      }
+      File credFile = new File(credFilePathUrl.getPath());
+      if (!credFile.exists()) {
+        throw new IllegalArgumentException("Invalid credentials file path: " + _credentialsFilePath
+            + ". File does not exist.");
+      }
+    } catch (MalformedURLException mue) {
+      throw new IllegalArgumentException("Invalid credentials file path: " + _credentialsFilePath, mue);
+    }
+  }
+
+  private String getConfigValue(Map<String, String> streamConfigMap, String key) {
+    return streamConfigMap.get(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, key));
+  }
+
+  private String getConfigValueOrDefault(Map<String, String> streamConfigMap, String key, String defaultValue) {
+    return streamConfigMap.getOrDefault(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, key), defaultValue);
   }
 
   private Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> parseConfigStringToEnumSet(
@@ -144,4 +181,16 @@ public class PulsarConfig {
   public Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> getMetadataFields() {
     return _metadataFields;
   }
+
+  public String getIssuerUrl() {
+    return _issuerUrl;
+  }
+
+  public String getCredentialsFilePath() {
+    return _credentialsFilePath;
+  }
+
+  public String getAudience() {
+    return _audience;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
index 11033ec716..163400bc3b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
@@ -19,7 +19,11 @@
 package org.apache.pinot.plugin.stream.pulsar;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.List;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -27,6 +31,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +65,7 @@ public class PulsarPartitionLevelConnectionHandler {
         pulsarClientBuilder.authentication(authentication);
       }
 
+      getAuthenticationFactory(_config).ifPresent(pulsarClientBuilder::authentication);
       _pulsarClient = pulsarClientBuilder.build();
       LOGGER.info("Created pulsar client {}", _pulsarClient);
     } catch (Exception e) {
@@ -67,6 +73,26 @@ public class PulsarPartitionLevelConnectionHandler {
     }
   }
 
+  private Optional<Authentication> getAuthenticationFactory(PulsarConfig pulsarConfig) {
+    if (StringUtils.isNotBlank(pulsarConfig.getIssuerUrl())
+        && StringUtils.isBlank(pulsarConfig.getAudience())
+        && StringUtils.isBlank(pulsarConfig.getCredentialsFilePath())) {
+      try {
+        return Optional.of(AuthenticationFactoryOAuth2.clientCredentials(
+            new URL(pulsarConfig.getIssuerUrl()),
+            new URL(pulsarConfig.getCredentialsFilePath()),
+            pulsarConfig.getAudience()));
+      } catch (MalformedURLException mue) {
+        LOGGER.error("Failed to create authentication factory for pulsar client with config: "
+                + "issuer: {}, credential file path: {}, audience: {}",
+            pulsarConfig.getIssuerUrl(),
+            pulsarConfig.getCredentialsFilePath(),
+            pulsarConfig.getAudience(),
+            mue);
+      }
+    }
+    return Optional.empty();
+  }
   protected Reader<byte[]> createReaderForPartition(String topic, int partition, MessageId initialMessageId) {
     if (_pulsarClient == null) {
       throw new RuntimeException("Failed to create reader as no pulsar client found for topic " + topic);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
index badbbbc5f7..3c7eeffe80 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
@@ -19,8 +19,12 @@
 package org.apache.pinot.plugin.stream.pulsar;
 
 import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -113,4 +117,54 @@ public class PulsarConfigTest {
     Assert.assertFalse(pulsarConfig.isPopulateMetadata());
     Assert.assertEquals(metadataFieldsToExtract.size(), 0);
   }
+
+  @Test
+  public void testParsingConfigForOAuth() throws Exception {
+    Path testFile = null;
+    try {
+      testFile = Files.createTempFile("test_cred_file", ".json");
+      Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+      streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_ISSUER_URL),
+          "http://auth.test.com");
+      streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+              PulsarConfig.OAUTH_CREDS_FILE_PATH), "file://" + testFile.toFile().getAbsolutePath());
+      streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_AUDIENCE),
+          "urn:test:test");
+      StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+
+      PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+      Assert.assertEquals(pulsarConfig.getIssuerUrl(), "http://auth.test.com");
+      Assert.assertEquals(pulsarConfig.getCredentialsFilePath(),
+          "file://" + testFile.toFile().getAbsolutePath());
+      Assert.assertEquals(pulsarConfig.getAudience(), "urn:test:test");
+    } catch (Exception e) {
+      Assert.fail("Should not throw exception", e);
+    } finally {
+      Optional.ofNullable(testFile).map(Path::toFile).ifPresent(File::delete);
+    }
+  }
+
+  @Test
+  public void testParsingConfigFailFileValidationForOAuth() throws Exception {
+    String testFilePath = "file://path/to/file.json";
+
+    try {
+      Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+      streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_ISSUER_URL),
+          "http://auth.test.com");
+      streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+              PulsarConfig.OAUTH_CREDS_FILE_PATH),
+          testFilePath);
+      streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.OAUTH_AUDIENCE),
+          "urn:test:test");
+      StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+      PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId"); //will throw exception
+    } catch (IllegalArgumentException mue) {
+      //expected case.
+      String errorMessage = String.format("Invalid credentials file path: %s. File does not exist.", testFilePath);
+      Assert.assertEquals(errorMessage, mue.getMessage());
+    } catch (Exception e) {
+      Assert.fail("Should not throw other exception", e);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org