You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/07/27 16:06:16 UTC

[pinot] branch master updated: Add support for IAM role based credentials in Kinesis Plugin (#9071)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 49c0e24e3d Add support for IAM role based credentials in Kinesis Plugin (#9071)
49c0e24e3d is described below

commit 49c0e24e3d85a97911a80352f93e8ebf34214812
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jul 27 21:36:08 2022 +0530

    Add support for IAM role based credentials in Kinesis Plugin (#9071)
    
    * Add support for IAM roles
    
    * Add support for externalId
    
    * Provide proper credentials to STS client
    
    * Add default session id
    
    * Add javadoc
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 87 ++++++++++++++++++----
 .../stream/kinesis/KinesisConnectionHandler.java   | 53 +++++++++++--
 2 files changed, 119 insertions(+), 21 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 95221ff3bf..d8a3795a2a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.pinot.spi.stream.StreamConfig;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
@@ -36,9 +38,32 @@ public class KinesisConfig {
   public static final String MAX_RECORDS_TO_FETCH = "maxRecordsToFetch";
   public static final String ENDPOINT = "endpoint";
 
+  // IAM role configs
+  /**
+   * Enable Role based access to AWS.
+   * iamRoleBasedAccessEnabled - Set it to `true` to enable role based access, default: false
+   * roleArn - Required. specify the ARN of the role the client should assume.
+   * roleSessionName - session name to be used when creating a role based session. default: pinot-kineis-uuid
+   * externalId - string external id value required by role's policy. default: null
+   * sessionDurationSeconds - The duration, in seconds, of the role session. Default: 900
+   * asyncSessionUpdateEnabled -
+   *        Configure whether the provider should fetch credentials asynchronously in the background.
+   *       If this is true, threads are less likely to block when credentials are loaded,
+   *       but additional resources are used to maintain the provider. Default - `true`
+   */
+  public static final String IAM_ROLE_BASED_ACCESS_ENABLED = "iamRoleBasedAccessEnabled";
+  public static final String ROLE_ARN = "roleArn";
+  public static final String ROLE_SESSION_NAME = "roleSessionName";
+  public static final String EXTERNAL_ID = "externalId";
+  public static final String SESSION_DURATION_SECONDS = "sessionDurationSeconds";
+  public static final String ASYNC_SESSION_UPDATED_ENABLED = "asyncSessionUpdateEnabled";
+
   // TODO: this is a starting point, until a better default is figured out
   public static final String DEFAULT_MAX_RECORDS = "20";
   public static final String DEFAULT_SHARD_ITERATOR_TYPE = ShardIteratorType.LATEST.toString();
+  public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
+  public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
+  public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
 
   private final String _streamTopicName;
   private final String _awsRegion;
@@ -48,6 +73,14 @@ public class KinesisConfig {
   private final String _secretKey;
   private final String _endpoint;
 
+  // IAM Role values
+  private boolean _iamRoleBasedAccess;
+  private String _roleArn;
+  private String _roleSessionName;
+  private String _externalId;
+  private int _sessionDurationSeconds;
+  private boolean _asyncSessionUpdateEnabled;
+
   public KinesisConfig(StreamConfig streamConfig) {
     Map<String, String> props = streamConfig.getStreamConfigsMap();
     _streamTopicName = streamConfig.getTopicName();
@@ -60,23 +93,23 @@ public class KinesisConfig {
     _accessKey = props.get(ACCESS_KEY);
     _secretKey = props.get(SECRET_KEY);
     _endpoint = props.get(ENDPOINT);
-  }
 
-  public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
-      String secretKey, String endpoint) {
-    this(streamTopicName, awsRegion, shardIteratorType, accessKey, secretKey, Integer.parseInt(DEFAULT_MAX_RECORDS),
-        endpoint);
-  }
+    _iamRoleBasedAccess =
+        Boolean.parseBoolean(props.getOrDefault(IAM_ROLE_BASED_ACCESS_ENABLED, DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED));
+    _roleArn = props.get(ROLE_ARN);
+    _roleSessionName =
+        props.getOrDefault(ROLE_SESSION_NAME, Joiner.on("-").join("pinot", "kinesis", UUID.randomUUID()));
+    _externalId = props.get(EXTERNAL_ID);
+    _sessionDurationSeconds =
+        Integer.parseInt(props.getOrDefault(SESSION_DURATION_SECONDS, DEFAULT_SESSION_DURATION_SECONDS));
+    _asyncSessionUpdateEnabled =
+        Boolean.parseBoolean(props.getOrDefault(ASYNC_SESSION_UPDATED_ENABLED, DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
 
-  public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
-      String secretKey, int maxRecords, String endpoint) {
-    _streamTopicName = streamTopicName;
-    _awsRegion = awsRegion;
-    _shardIteratorType = shardIteratorType;
-    _accessKey = accessKey;
-    _secretKey = secretKey;
-    _numMaxRecordsToFetch = maxRecords;
-    _endpoint = endpoint;
+    if (_iamRoleBasedAccess) {
+      Preconditions.checkNotNull(_roleArn,
+          "Must provide 'roleArn' in stream config for table %s if iamRoleBasedAccess is enabled",
+          streamConfig.getTableNameWithType());
+    }
   }
 
   public String getStreamTopicName() {
@@ -106,4 +139,28 @@ public class KinesisConfig {
   public String getEndpoint() {
     return _endpoint;
   }
+
+  public boolean isIamRoleBasedAccess() {
+    return _iamRoleBasedAccess;
+  }
+
+  public String getRoleArn() {
+    return _roleArn;
+  }
+
+  public String getRoleSessionName() {
+    return _roleSessionName;
+  }
+
+  public String getExternalId() {
+    return _externalId;
+  }
+
+  public int getSessionDurationSeconds() {
+    return _sessionDurationSeconds;
+  }
+
+  public boolean isAsyncSessionUpdateEnabled() {
+    return _asyncSessionUpdateEnabled;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index dd7bf36329..7fbd0e28c2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
@@ -33,6 +34,9 @@ import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
 import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 
 
 /**
@@ -45,6 +49,7 @@ public class KinesisConnectionHandler {
   private final String _accessKey;
   private final String _secretKey;
   private final String _endpoint;
+  private final KinesisConfig _kinesisConfig;
 
   public KinesisConnectionHandler(KinesisConfig kinesisConfig) {
     _stream = kinesisConfig.getStreamTopicName();
@@ -52,6 +57,7 @@ public class KinesisConnectionHandler {
     _accessKey = kinesisConfig.getAccessKey();
     _secretKey = kinesisConfig.getSecretKey();
     _endpoint = kinesisConfig.getEndpoint();
+    _kinesisConfig = kinesisConfig;
     createConnection();
   }
 
@@ -62,6 +68,7 @@ public class KinesisConnectionHandler {
     _accessKey = kinesisConfig.getAccessKey();
     _secretKey = kinesisConfig.getSecretKey();
     _endpoint = kinesisConfig.getEndpoint();
+    _kinesisConfig = kinesisConfig;
     _kinesisClient = kinesisClient;
   }
 
@@ -80,17 +87,51 @@ public class KinesisConnectionHandler {
   public void createConnection() {
     if (_kinesisClient == null) {
       KinesisClientBuilder kinesisClientBuilder;
+
+      AwsCredentialsProvider awsCredentialsProvider;
       if (StringUtils.isNotBlank(_accessKey) && StringUtils.isNotBlank(_secretKey)) {
         AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(_accessKey, _secretKey);
-        kinesisClientBuilder = KinesisClient.builder().region(Region.of(_region))
-            .credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials))
-            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        awsCredentialsProvider = StaticCredentialsProvider.create(awsBasicCredentials);
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(_region)).credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        awsCredentialsProvider = DefaultCredentialsProvider.create();
+      }
+
+      if (_kinesisConfig.isIamRoleBasedAccess()) {
+        AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+            AssumeRoleRequest.builder()
+                .roleArn(_kinesisConfig.getRoleArn())
+                .roleSessionName(_kinesisConfig.getRoleSessionName())
+                .durationSeconds(_kinesisConfig.getSessionDurationSeconds());
+
+        AssumeRoleRequest assumeRoleRequest;
+        if (StringUtils.isNotEmpty(_kinesisConfig.getExternalId())) {
+          assumeRoleRequest = assumeRoleRequestBuilder
+              .externalId(_kinesisConfig.getExternalId())
+              .build();
+        } else {
+          assumeRoleRequest = assumeRoleRequestBuilder.build();
+        }
+
+        StsClient stsClient =
+            StsClient.builder()
+                .region(Region.of(_region))
+                .credentialsProvider(awsCredentialsProvider)
+                .build();
+
+        awsCredentialsProvider =
+            StsAssumeRoleCredentialsProvider.builder()
+                .stsClient(stsClient)
+                .refreshRequest(assumeRoleRequest)
+                .asyncCredentialUpdateEnabled(_kinesisConfig.isAsyncSessionUpdateEnabled())
+                .build();
       }
 
+      kinesisClientBuilder =
+          KinesisClient.builder()
+              .region(Region.of(_region))
+              .credentialsProvider(awsCredentialsProvider)
+              .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+
       if (StringUtils.isNotBlank(_endpoint)) {
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(_endpoint));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org