You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:52:00 UTC

[04/10] flink git commit: [FLINK-9686] [kinesis] Allow creating AWS credentials by assuming a role

[FLINK-9686] [kinesis] Allow creating AWS credentials by assuming a role

Config example:

```
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: <arn>
aws.credentials.provider.role.sessionName: session-name
aws.credentials.provider.role.provider: AUTO
```

[FLINK-9686] [kinesis] Housekeeping: Use early return instead of variable assignment and break

[FLINK-9686] [kinesis] Add dependency on aws-java-sdk-sts

Implicitly (via `Class.forName`) used by `STSProfileCredentialsServiceProvider`.
Due to shading, it is not possible to treat this as a "provided" dependency, as
Maven rewrites the class name with the shaded one, which would force clients to
provide aws-java-sdk-sts shaded in the same way.

[FLINK-9686] [kinesis] Mention new config option in docs

[FLINK-9686] [kinesis] Use `STSAssumeRoleSessionCredentialsProvider` instead

[FLINK-9686] [kinesis] Add constants for new config options

This closes #6221.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/229ed775
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/229ed775
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/229ed775

Branch: refs/heads/master
Commit: 229ed7755c5bddd9856233e019ff3fa8ddef29a7
Parents: a25cd3f
Author: Franz Thoma <fr...@tngtech.com>
Authored: Mon May 14 15:02:12 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  |  2 +-
 .../flink-connector-kinesis/pom.xml             |  6 ++
 .../kinesis/config/AWSConfigConstants.java      | 64 ++++++++++++++++--
 .../connectors/kinesis/util/AWSUtil.java        | 68 +++++++++++++-------
 .../kinesis/util/KinesisConfigUtilTest.java     |  7 +-
 5 files changed, 111 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 6a60125..834677d 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -113,7 +113,7 @@ The above is a simple example of using the consumer. Configuration for the consu
 instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example
 demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
 the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
-`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, and `AUTO`). Also, data is being consumed
+`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
 from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
 to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 690c4f8..62b9539 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -98,6 +98,12 @@ under the License.
 
 		<dependency>
 			<groupId>com.amazonaws</groupId>
+			<artifactId>aws-java-sdk-sts</artifactId>
+			<version>${aws.sdk.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.amazonaws</groupId>
 			<artifactId>amazon-kinesis-producer</artifactId>
 			<version>${aws.kinesis-kpl.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
index f3ff52b..557e846 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -45,6 +45,9 @@ public class AWSConfigConstants {
 		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
 		BASIC,
 
+		/** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
+		ASSUME_ROLE,
+
 		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
 		AUTO,
 	}
@@ -52,22 +55,69 @@ public class AWSConfigConstants {
 	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
 	public static final String AWS_REGION = "aws.region";
 
+	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
+	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
 	/** The AWS access key ID to use when setting credentials provider type to BASIC. */
-	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
+	public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
 
 	/** The AWS secret key to use when setting credentials provider type to BASIC. */
-	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
-
-	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
-	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+	public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
 
 	/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
-	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
+	public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
 
 	/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
-	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
+	public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
+
+	/** The role ARN to use when credential provider type is set to ASSUME_ROLE. */
+	public static final String AWS_ROLE_ARN = roleArn(AWS_CREDENTIALS_PROVIDER);
+
+	/** The role session name to use when credential provider type is set to ASSUME_ROLE. */
+	public static final String AWS_ROLE_SESSION_NAME = roleSessionName(AWS_CREDENTIALS_PROVIDER);
+
+	/** The external ID to use when credential provider type is set to ASSUME_ROLE. */
+	public static final String AWS_ROLE_EXTERNAL_ID = externalId(AWS_CREDENTIALS_PROVIDER);
+
+	/**
+	 * The credentials provider that provides credentials for assuming the role when credential
+	 * provider type is set to ASSUME_ROLE.
+	 * Roles can be nested, so AWS_ROLE_CREDENTIALS_PROVIDER can again be set to "ASSUME_ROLE"
+	 */
+	public static final String AWS_ROLE_CREDENTIALS_PROVIDER = roleCredentialsProvider(AWS_CREDENTIALS_PROVIDER);
 
 	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
 	public static final String AWS_ENDPOINT = "aws.endpoint";
 
+	public static String accessKeyId(String prefix) {
+		return prefix + ".basic.accesskeyid";
+	}
+
+	public static String secretKey(String prefix) {
+		return prefix + ".basic.secretkey";
+	}
+
+	public static String profilePath(String prefix) {
+		return prefix + ".profile.path";
+	}
+
+	public static String profileName(String prefix) {
+		return prefix + ".profile.name";
+	}
+
+	public static String roleArn(String prefix) {
+		return prefix + ".role.arn";
+	}
+
+	public static String roleSessionName(String prefix) {
+		return prefix + ".role.sessionName";
+	}
+
+	public static String externalId(String prefix) {
+		return prefix + ".role.externalId";
+	}
+
+	public static String roleCredentialsProvider(String prefix) {
+		return prefix + ".role.provider";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 2678c90..9e5c6cb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -29,6 +29,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.client.builder.AwsClientBuilder;
@@ -41,6 +42,8 @@ import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory;
 import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
 import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
 import com.fasterxml.jackson.databind.deser.DeserializerFactory;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -99,10 +102,24 @@ public class AWSUtil {
 	 * @return The corresponding AWS Credentials Provider instance
 	 */
 	public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
+		return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+	}
+
+	/**
+	 * If the provider is ASSUME_ROLE, then the credentials for assuming this role are determined
+	 * recursively.
+	 *
+	 * @param configProps the configuration properties
+	 * @param configPrefix the prefix of the config properties for this credentials provider,
+	 *                     e.g. aws.credentials.provider for the base credentials provider,
+	 *                     aws.credentials.provider.role.provider for the credentials provider
+	 *                     for assuming a role, and so on.
+	 */
+	private static AWSCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
 		CredentialProvider credentialProviderType;
-		if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
-			if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
-				&& configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+		if (!configProps.containsKey(configPrefix)) {
+			if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
+				&& configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
 				// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
 				credentialProviderType = CredentialProvider.BASIC;
 			} else {
@@ -110,35 +127,32 @@ public class AWSUtil {
 				credentialProviderType = CredentialProvider.AUTO;
 			}
 		} else {
-			credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
-				AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
+			credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(configPrefix));
 		}
 
-		AWSCredentialsProvider credentialsProvider;
-
 		switch (credentialProviderType) {
 			case ENV_VAR:
-				credentialsProvider = new EnvironmentVariableCredentialsProvider();
-				break;
+				return new EnvironmentVariableCredentialsProvider();
+
 			case SYS_PROP:
-				credentialsProvider = new SystemPropertiesCredentialsProvider();
-				break;
+				return new SystemPropertiesCredentialsProvider();
+
 			case PROFILE:
 				String profileName = configProps.getProperty(
-					AWSConfigConstants.AWS_PROFILE_NAME, null);
+						AWSConfigConstants.profileName(configPrefix), null);
 				String profileConfigPath = configProps.getProperty(
-					AWSConfigConstants.AWS_PROFILE_PATH, null);
-				credentialsProvider = (profileConfigPath == null)
+						AWSConfigConstants.profilePath(configPrefix), null);
+				return (profileConfigPath == null)
 					? new ProfileCredentialsProvider(profileName)
 					: new ProfileCredentialsProvider(profileConfigPath, profileName);
-				break;
+
 			case BASIC:
-				credentialsProvider = new AWSCredentialsProvider() {
+				return new AWSCredentialsProvider() {
 					@Override
 					public AWSCredentials getCredentials() {
 						return new BasicAWSCredentials(
-							configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID),
-							configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+							configProps.getProperty(AWSConfigConstants.accessKeyId(configPrefix)),
+							configProps.getProperty(AWSConfigConstants.secretKey(configPrefix)));
 					}
 
 					@Override
@@ -146,13 +160,23 @@ public class AWSUtil {
 						// do nothing
 					}
 				};
-				break;
+
+			case ASSUME_ROLE:
+				final AWSSecurityTokenService baseCredentials = AWSSecurityTokenServiceClientBuilder.standard()
+						.withCredentials(getCredentialsProvider(configProps, AWSConfigConstants.roleCredentialsProvider(configPrefix)))
+						.withRegion(configProps.getProperty(AWSConfigConstants.AWS_REGION))
+						.build();
+				return new STSAssumeRoleSessionCredentialsProvider.Builder(
+						configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)),
+						configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+						.withExternalId(configProps.getProperty(AWSConfigConstants.externalId(configPrefix)))
+						.withStsClient(baseCredentials)
+						.build();
+
 			default:
 			case AUTO:
-				credentialsProvider = new DefaultAWSCredentialsProviderChain();
+				return new DefaultAWSCredentialsProviderChain();
 		}
-
-		return credentialsProvider;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/229ed775/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 7d05783..c4bfa17 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -324,12 +324,7 @@ public class KinesisConfigUtilTest {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
 
-		try {
-			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test