You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/07/27 16:06:16 UTC
[pinot] branch master updated: Add support for IAM role based credentials in Kinesis Plugin (#9071)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 49c0e24e3d Add support for IAM role based credentials in Kinesis Plugin (#9071)
49c0e24e3d is described below
commit 49c0e24e3d85a97911a80352f93e8ebf34214812
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jul 27 21:36:08 2022 +0530
Add support for IAM role based credentials in Kinesis Plugin (#9071)
* Add support for IAM roles
* Add support for externalId
* Provide proper credentials to STS client
* Add default session id
* Add javadoc
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 87 ++++++++++++++++++----
.../stream/kinesis/KinesisConnectionHandler.java | 53 +++++++++++--
2 files changed, 119 insertions(+), 21 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 95221ff3bf..d8a3795a2a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.util.Map;
+import java.util.UUID;
import org.apache.pinot.spi.stream.StreamConfig;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@@ -36,9 +38,32 @@ public class KinesisConfig {
public static final String MAX_RECORDS_TO_FETCH = "maxRecordsToFetch";
public static final String ENDPOINT = "endpoint";
+ // IAM role configs
+ /**
+ * Enable Role based access to AWS.
+ * iamRoleBasedAccessEnabled - Set it to `true` to enable role based access, default: false
+ * roleArn - Required. specify the ARN of the role the client should assume.
+ * roleSessionName - session name to be used when creating a role based session. default: pinot-kineis-uuid
+ * externalId - string external id value required by role's policy. default: null
+ * sessionDurationSeconds - The duration, in seconds, of the role session. Default: 900
+ * asyncSessionUpdateEnabled -
+ * Configure whether the provider should fetch credentials asynchronously in the background.
+ * If this is true, threads are less likely to block when credentials are loaded,
+ * but additional resources are used to maintain the provider. Default - `true`
+ */
+ public static final String IAM_ROLE_BASED_ACCESS_ENABLED = "iamRoleBasedAccessEnabled";
+ public static final String ROLE_ARN = "roleArn";
+ public static final String ROLE_SESSION_NAME = "roleSessionName";
+ public static final String EXTERNAL_ID = "externalId";
+ public static final String SESSION_DURATION_SECONDS = "sessionDurationSeconds";
+ public static final String ASYNC_SESSION_UPDATED_ENABLED = "asyncSessionUpdateEnabled";
+
// TODO: this is a starting point, until a better default is figured out
public static final String DEFAULT_MAX_RECORDS = "20";
public static final String DEFAULT_SHARD_ITERATOR_TYPE = ShardIteratorType.LATEST.toString();
+ public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
+ public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
+ public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
private final String _streamTopicName;
private final String _awsRegion;
@@ -48,6 +73,14 @@ public class KinesisConfig {
private final String _secretKey;
private final String _endpoint;
+ // IAM Role values
+ private boolean _iamRoleBasedAccess;
+ private String _roleArn;
+ private String _roleSessionName;
+ private String _externalId;
+ private int _sessionDurationSeconds;
+ private boolean _asyncSessionUpdateEnabled;
+
public KinesisConfig(StreamConfig streamConfig) {
Map<String, String> props = streamConfig.getStreamConfigsMap();
_streamTopicName = streamConfig.getTopicName();
@@ -60,23 +93,23 @@ public class KinesisConfig {
_accessKey = props.get(ACCESS_KEY);
_secretKey = props.get(SECRET_KEY);
_endpoint = props.get(ENDPOINT);
- }
- public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
- String secretKey, String endpoint) {
- this(streamTopicName, awsRegion, shardIteratorType, accessKey, secretKey, Integer.parseInt(DEFAULT_MAX_RECORDS),
- endpoint);
- }
+ _iamRoleBasedAccess =
+ Boolean.parseBoolean(props.getOrDefault(IAM_ROLE_BASED_ACCESS_ENABLED, DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED));
+ _roleArn = props.get(ROLE_ARN);
+ _roleSessionName =
+ props.getOrDefault(ROLE_SESSION_NAME, Joiner.on("-").join("pinot", "kinesis", UUID.randomUUID()));
+ _externalId = props.get(EXTERNAL_ID);
+ _sessionDurationSeconds =
+ Integer.parseInt(props.getOrDefault(SESSION_DURATION_SECONDS, DEFAULT_SESSION_DURATION_SECONDS));
+ _asyncSessionUpdateEnabled =
+ Boolean.parseBoolean(props.getOrDefault(ASYNC_SESSION_UPDATED_ENABLED, DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
- public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
- String secretKey, int maxRecords, String endpoint) {
- _streamTopicName = streamTopicName;
- _awsRegion = awsRegion;
- _shardIteratorType = shardIteratorType;
- _accessKey = accessKey;
- _secretKey = secretKey;
- _numMaxRecordsToFetch = maxRecords;
- _endpoint = endpoint;
+ if (_iamRoleBasedAccess) {
+ Preconditions.checkNotNull(_roleArn,
+ "Must provide 'roleArn' in stream config for table %s if iamRoleBasedAccess is enabled",
+ streamConfig.getTableNameWithType());
+ }
}
public String getStreamTopicName() {
@@ -106,4 +139,28 @@ public class KinesisConfig {
public String getEndpoint() {
return _endpoint;
}
+
+ public boolean isIamRoleBasedAccess() {
+ return _iamRoleBasedAccess;
+ }
+
+ public String getRoleArn() {
+ return _roleArn;
+ }
+
+ public String getRoleSessionName() {
+ return _roleSessionName;
+ }
+
+ public String getExternalId() {
+ return _externalId;
+ }
+
+ public int getSessionDurationSeconds() {
+ return _sessionDurationSeconds;
+ }
+
+ public boolean isAsyncSessionUpdateEnabled() {
+ return _asyncSessionUpdateEnabled;
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index dd7bf36329..7fbd0e28c2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
@@ -33,6 +34,9 @@ import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
/**
@@ -45,6 +49,7 @@ public class KinesisConnectionHandler {
private final String _accessKey;
private final String _secretKey;
private final String _endpoint;
+ private final KinesisConfig _kinesisConfig;
public KinesisConnectionHandler(KinesisConfig kinesisConfig) {
_stream = kinesisConfig.getStreamTopicName();
@@ -52,6 +57,7 @@ public class KinesisConnectionHandler {
_accessKey = kinesisConfig.getAccessKey();
_secretKey = kinesisConfig.getSecretKey();
_endpoint = kinesisConfig.getEndpoint();
+ _kinesisConfig = kinesisConfig;
createConnection();
}
@@ -62,6 +68,7 @@ public class KinesisConnectionHandler {
_accessKey = kinesisConfig.getAccessKey();
_secretKey = kinesisConfig.getSecretKey();
_endpoint = kinesisConfig.getEndpoint();
+ _kinesisConfig = kinesisConfig;
_kinesisClient = kinesisClient;
}
@@ -80,17 +87,51 @@ public class KinesisConnectionHandler {
public void createConnection() {
if (_kinesisClient == null) {
KinesisClientBuilder kinesisClientBuilder;
+
+ AwsCredentialsProvider awsCredentialsProvider;
if (StringUtils.isNotBlank(_accessKey) && StringUtils.isNotBlank(_secretKey)) {
AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(_accessKey, _secretKey);
- kinesisClientBuilder = KinesisClient.builder().region(Region.of(_region))
- .credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials))
- .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+ awsCredentialsProvider = StaticCredentialsProvider.create(awsBasicCredentials);
} else {
- kinesisClientBuilder =
- KinesisClient.builder().region(Region.of(_region)).credentialsProvider(DefaultCredentialsProvider.create())
- .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+ awsCredentialsProvider = DefaultCredentialsProvider.create();
+ }
+
+ if (_kinesisConfig.isIamRoleBasedAccess()) {
+ AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+ AssumeRoleRequest.builder()
+ .roleArn(_kinesisConfig.getRoleArn())
+ .roleSessionName(_kinesisConfig.getRoleSessionName())
+ .durationSeconds(_kinesisConfig.getSessionDurationSeconds());
+
+ AssumeRoleRequest assumeRoleRequest;
+ if (StringUtils.isNotEmpty(_kinesisConfig.getExternalId())) {
+ assumeRoleRequest = assumeRoleRequestBuilder
+ .externalId(_kinesisConfig.getExternalId())
+ .build();
+ } else {
+ assumeRoleRequest = assumeRoleRequestBuilder.build();
+ }
+
+ StsClient stsClient =
+ StsClient.builder()
+ .region(Region.of(_region))
+ .credentialsProvider(awsCredentialsProvider)
+ .build();
+
+ awsCredentialsProvider =
+ StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .refreshRequest(assumeRoleRequest)
+ .asyncCredentialUpdateEnabled(_kinesisConfig.isAsyncSessionUpdateEnabled())
+ .build();
}
+ kinesisClientBuilder =
+ KinesisClient.builder()
+ .region(Region.of(_region))
+ .credentialsProvider(awsCredentialsProvider)
+ .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+
if (StringUtils.isNotBlank(_endpoint)) {
try {
kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(_endpoint));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org