You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:52:00 UTC
[04/10] flink git commit: [FLINK-9686] [kinesis] Allow creating AWS
credentials by assuming a role
[FLINK-9686] [kinesis] Allow creating AWS credentials by assuming a role
Config example:
```
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: <arn>
aws.credentials.provider.role.sessionName: session-name
aws.credentials.provider.role.provider: AUTO
```
[FLINK-9686] [kinesis] Housekeeping: Use early return instead of variable assignment and break
[FLINK-9686] [kinesis] Add dependency on aws-java-sdk-sts
Implicitly (via `Class.forName`) used by `STSProfileCredentialsServiceProvider`.
Due to shading, it is not possible to treat this as a "provided" dependency, as
Maven rewrites the class name with the shaded one, which would force clients to
provide aws-java-sdk-sts shaded in the same way.
[FLINK-9686] [kinesis] Mention new config option in docs
[FLINK-9686] [kinesis] Use `STSAssumeRoleSessionCredentialsProvider` instead
[FLINK-9686] [kinesis] Add constants for new config options
This closes #6221.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/229ed775
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/229ed775
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/229ed775
Branch: refs/heads/master
Commit: 229ed7755c5bddd9856233e019ff3fa8ddef29a7
Parents: a25cd3f
Author: Franz Thoma <fr...@tngtech.com>
Authored: Mon May 14 15:02:12 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200
----------------------------------------------------------------------
docs/dev/connectors/kinesis.md | 2 +-
.../flink-connector-kinesis/pom.xml | 6 ++
.../kinesis/config/AWSConfigConstants.java | 64 ++++++++++++++++--
.../connectors/kinesis/util/AWSUtil.java | 68 +++++++++++++-------
.../kinesis/util/KinesisConfigUtilTest.java | 7 +-
5 files changed, 111 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 6a60125..834677d 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -113,7 +113,7 @@ The above is a simple example of using the consumer. Configuration for the consu
instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example
demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
-`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, and `AUTO`). Also, data is being consumed
+`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 690c4f8..62b9539 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -98,6 +98,12 @@ under the License.
<dependency>
<groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-sts</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis-kpl.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
index f3ff52b..557e846 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -45,6 +45,9 @@ public class AWSConfigConstants {
/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
BASIC,
+ /** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
+ ASSUME_ROLE,
+
/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
AUTO,
}
@@ -52,22 +55,69 @@ public class AWSConfigConstants {
/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
public static final String AWS_REGION = "aws.region";
+ /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
+ public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
/** The AWS access key ID to use when setting credentials provider type to BASIC. */
- public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
+ public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
/** The AWS secret key to use when setting credentials provider type to BASIC. */
- public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
-
- /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
- public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+ public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
- public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
+ public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
- public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
+ public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
+
+ /** The role ARN to use when credential provider type is set to ASSUME_ROLE. */
+ public static final String AWS_ROLE_ARN = roleArn(AWS_CREDENTIALS_PROVIDER);
+
+ /** The role session name to use when credential provider type is set to ASSUME_ROLE. */
+ public static final String AWS_ROLE_SESSION_NAME = roleSessionName(AWS_CREDENTIALS_PROVIDER);
+
+ /** The external ID to use when credential provider type is set to ASSUME_ROLE. */
+ public static final String AWS_ROLE_EXTERNAL_ID = externalId(AWS_CREDENTIALS_PROVIDER);
+
+ /**
+ * The credentials provider that provides credentials for assuming the role when credential
+ * provider type is set to ASSUME_ROLE.
+ * Roles can be nested, so AWS_ROLE_CREDENTIALS_PROVIDER can again be set to "ASSUME_ROLE"
+ */
+ public static final String AWS_ROLE_CREDENTIALS_PROVIDER = roleCredentialsProvider(AWS_CREDENTIALS_PROVIDER);
/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
public static final String AWS_ENDPOINT = "aws.endpoint";
+ public static String accessKeyId(String prefix) {
+ return prefix + ".basic.accesskeyid";
+ }
+
+ public static String secretKey(String prefix) {
+ return prefix + ".basic.secretkey";
+ }
+
+ public static String profilePath(String prefix) {
+ return prefix + ".profile.path";
+ }
+
+ public static String profileName(String prefix) {
+ return prefix + ".profile.name";
+ }
+
+ public static String roleArn(String prefix) {
+ return prefix + ".role.arn";
+ }
+
+ public static String roleSessionName(String prefix) {
+ return prefix + ".role.sessionName";
+ }
+
+ public static String externalId(String prefix) {
+ return prefix + ".role.externalId";
+ }
+
+ public static String roleCredentialsProvider(String prefix) {
+ return prefix + ".role.provider";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 2678c90..9e5c6cb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -29,6 +29,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
@@ -41,6 +42,8 @@ import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
import com.fasterxml.jackson.databind.deser.DeserializerFactory;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import java.io.IOException;
import java.util.HashMap;
@@ -99,10 +102,24 @@ public class AWSUtil {
* @return The corresponding AWS Credentials Provider instance
*/
public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
+ return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+ }
+
+ /**
+ * If the provider is ASSUME_ROLE, then the credentials for assuming this role are determined
+ * recursively.
+ *
+ * @param configProps the configuration properties
+ * @param configPrefix the prefix of the config properties for this credentials provider,
+ * e.g. aws.credentials.provider for the base credentials provider,
+ * aws.credentials.provider.role.provider for the credentials provider
+ * for assuming a role, and so on.
+ */
+ private static AWSCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
CredentialProvider credentialProviderType;
- if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
- if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
- && configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+ if (!configProps.containsKey(configPrefix)) {
+ if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
+ && configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
credentialProviderType = CredentialProvider.BASIC;
} else {
@@ -110,35 +127,32 @@ public class AWSUtil {
credentialProviderType = CredentialProvider.AUTO;
}
} else {
- credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
- AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
+ credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(configPrefix));
}
- AWSCredentialsProvider credentialsProvider;
-
switch (credentialProviderType) {
case ENV_VAR:
- credentialsProvider = new EnvironmentVariableCredentialsProvider();
- break;
+ return new EnvironmentVariableCredentialsProvider();
+
case SYS_PROP:
- credentialsProvider = new SystemPropertiesCredentialsProvider();
- break;
+ return new SystemPropertiesCredentialsProvider();
+
case PROFILE:
String profileName = configProps.getProperty(
- AWSConfigConstants.AWS_PROFILE_NAME, null);
+ AWSConfigConstants.profileName(configPrefix), null);
String profileConfigPath = configProps.getProperty(
- AWSConfigConstants.AWS_PROFILE_PATH, null);
- credentialsProvider = (profileConfigPath == null)
+ AWSConfigConstants.profilePath(configPrefix), null);
+ return (profileConfigPath == null)
? new ProfileCredentialsProvider(profileName)
: new ProfileCredentialsProvider(profileConfigPath, profileName);
- break;
+
case BASIC:
- credentialsProvider = new AWSCredentialsProvider() {
+ return new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(
- configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID),
- configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+ configProps.getProperty(AWSConfigConstants.accessKeyId(configPrefix)),
+ configProps.getProperty(AWSConfigConstants.secretKey(configPrefix)));
}
@Override
@@ -146,13 +160,23 @@ public class AWSUtil {
// do nothing
}
};
- break;
+
+ case ASSUME_ROLE:
+ final AWSSecurityTokenService baseCredentials = AWSSecurityTokenServiceClientBuilder.standard()
+ .withCredentials(getCredentialsProvider(configProps, AWSConfigConstants.roleCredentialsProvider(configPrefix)))
+ .withRegion(configProps.getProperty(AWSConfigConstants.AWS_REGION))
+ .build();
+ return new STSAssumeRoleSessionCredentialsProvider.Builder(
+ configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)),
+ configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+ .withExternalId(configProps.getProperty(AWSConfigConstants.externalId(configPrefix)))
+ .withStsClient(baseCredentials)
+ .build();
+
default:
case AUTO:
- credentialsProvider = new DefaultAWSCredentialsProviderChain();
+ return new DefaultAWSCredentialsProviderChain();
}
-
- return credentialsProvider;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 7d05783..c4bfa17 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -324,12 +324,7 @@ public class KinesisConfigUtilTest {
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
- try {
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test