You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fmthoma <gi...@git.apache.org> on 2018/06/28 07:42:40 UTC
[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...
GitHub user fmthoma opened a pull request:
https://github.com/apache/flink/pull/6221
[FLINK-9686] [kinesis] Enable Kinesis authentication via AssumeRole
## What is the purpose of the change
Enable `FlinkKinesisProducer` to authenticate via assuming a role.
### Current situation:
FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials via one of the following mechanisms:
* Environment variables
* System properties
* An AWS profile
* Directly provided credentials (`BASIC`)
* AWS's own default heuristic (`AUTO`)
For streaming across AWS accounts, it is considered good practise to enable access to the remote Kinesis stream via a role, rather than passing credentials for the remote account.
### Proposed change:
Add a new credentials provider specifying a role ARN, session name, and an additional credentials provider supplying the credentials for assuming the role.
Config example for assuming role `<role-arn>` with auto-detected credentials:{{}}
```
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: <role-arn>
aws.credentials.provider.role.sessionName: my-session-name
aws.credentials.provider.role.provider: AUTO
```
`ASSUME_ROLE` credentials providers can be nested, i.e. it is possible to assume a role which in turn is allowed to assume another role:
```
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: <role-arn>
aws.credentials.provider.role.sessionName: my-session-name
aws.credentials.provider.role.provider: ASSUME_ROLE
aws.credentials.provider.role.provider.role.arn: <nested-role-arn>
aws.credentials.provider.role.provider.role.sessionName: my-nested-session-name
aws.credentials.provider.role.provider.role.provider: AUTO
```
## Brief change log
- Add `aws.credentials.provider` option `ASSUME_ROLE` for authenticating via assuming a role.
## Verifying this change
The feature changed was not covered by tests, and it is hard to add non-trivial tests. It can be verified manually:
* Create an AWS IAM user and a role, and give the user permissions to assume the role.
* Create a Kinesis stream, and give the role permissions to write to this stream, but not the user.
* Set up the config by passing the user's credentials to assume the role
* The Kinesis producer should now be able to write to the stream.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): yes
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no, although a config option is added.
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs, JavaDocs
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/fmthoma/flink enableRoles
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6221.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6221
----
commit 45adaa5a14a3db340fce878db50ce6ed8716fe6c
Author: Franz Thoma <fr...@...>
Date: 2018-05-14T13:02:12Z
[FLINK-9686] [kinesis] Allow creating AWS credentials by assuming a role
Config example:
```
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: <arn>
aws.credentials.provider.role.sessionName: session-name
aws.credentials.provider.role.provider: AUTO
```
commit 3189b48a6cac2898f63cbf75105f544cde32ff58
Author: Franz Thoma <fr...@...>
Date: 2018-05-14T13:04:08Z
[FLINK-9686] [kinesis] Housekeeping: Use early return instead of variable assignment and break
commit 6fe344c8adb9d2c0bed0006216105b0a5032da55
Author: Franz Thoma <fr...@...>
Date: 2018-05-14T13:13:06Z
[FLINK-9686] [kinesis] Housekeeping
commit eb404061e9c4f87dbba1d18a3300fc190af5ea92
Author: Franz Thoma <fr...@...>
Date: 2018-05-15T13:33:50Z
[FLINK-9686] [kinesis] Add dependency on aws-java-sdk-sts
Implicitly (via `Class.forName`) used by `STSProfileCredentialsServiceProvider`.
Due to shading, it is not possible to treat this as a "provided" dependency, as
Maven rewrites the class name with the shaded one, which would force clients to
provide aws-java-sdk-sts shaded in the same way.
commit d7ef8b977f379b7178260cbaf7bcdde6b6b3df1a
Author: Franz Thoma <fr...@...>
Date: 2018-06-28T07:42:20Z
[FLINK-9686] [kinesis] Mention new config option in docs
----
---
[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6221
---
[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6221#discussion_r199420811
--- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ---
@@ -45,29 +45,63 @@
/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
BASIC,
+ /** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
+ ASSUME_ROLE,
+
/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
AUTO,
}
/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
public static final String AWS_REGION = "aws.region";
+ /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
+ public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
/** The AWS access key ID to use when setting credentials provider type to BASIC. */
- public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
+ public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
/** The AWS secret key to use when setting credentials provider type to BASIC. */
- public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
-
- /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
- public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+ public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
- public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
+ public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
- public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
+ public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
public static final String AWS_ENDPOINT = "aws.endpoint";
+ public static String accessKeyId(String prefix) {
+ return prefix + ".basic.accesskeyid";
+ }
+
+ public static String secretKey(String prefix) {
+ return prefix + ".basic.secretkey";
+ }
+
+ public static String profilePath(String prefix) {
+ return prefix + ".profile.path";
+ }
+
+ public static String profileName(String prefix) {
+ return prefix + ".profile.name";
+ }
+
+ public static String roleArn(String prefix) {
--- End diff --
Is there a reason to change the way key constants are defined in this class?
i.e., if the previous pattern was followed, users could just use `AwsConfigConstants.AWS_ROLE_ARN` to set a value for the role ARN, and likewise for the other new configurations.
---
[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...
Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:
https://github.com/apache/flink/pull/6221#discussion_r199462901
--- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ---
@@ -45,29 +45,63 @@
/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
BASIC,
+ /** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
+ ASSUME_ROLE,
+
/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
AUTO,
}
/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
public static final String AWS_REGION = "aws.region";
+ /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
+ public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
/** The AWS access key ID to use when setting credentials provider type to BASIC. */
- public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
+ public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
/** The AWS secret key to use when setting credentials provider type to BASIC. */
- public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
-
- /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
- public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+ public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
- public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
+ public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
- public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
+ public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
public static final String AWS_ENDPOINT = "aws.endpoint";
+ public static String accessKeyId(String prefix) {
+ return prefix + ".basic.accesskeyid";
+ }
+
+ public static String secretKey(String prefix) {
+ return prefix + ".basic.secretkey";
+ }
+
+ public static String profilePath(String prefix) {
+ return prefix + ".profile.path";
+ }
+
+ public static String profileName(String prefix) {
+ return prefix + ".profile.name";
+ }
+
+ public static String roleArn(String prefix) {
--- End diff --
The reason is that you can assume a role via another role (via another role...), so the configuration is recursive. So I introduced these methods that build config keys based on some prefix.
But I see your point that users want to use constants to refer to config keys, so I will add some constants for the configuration of the first role:
* `AWS_ROLE_ARN`
* `AWS_ROLE_SISSION_NAME`
* `AWS_ROLE_EXTERNAL_ID`
* `AWS_ROLE_CREDENTIALS_PROVIDER`
---
[GitHub] flink issue #6221: [FLINK-9686] [kinesis] Enable Kinesis authentication via ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/6221
Thanks @fmthoma for addressing @tzulitai comments. Merging this PR now.
---
[GitHub] flink issue #6221: [FLINK-9686] [kinesis] Enable Kinesis authentication via ...
Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:
https://github.com/apache/flink/pull/6221
@tillrohrmann @tzulitai Thank you!
---