You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fmthoma <gi...@git.apache.org> on 2018/06/28 07:42:40 UTC

[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...

GitHub user fmthoma opened a pull request:

    https://github.com/apache/flink/pull/6221

    [FLINK-9686] [kinesis] Enable Kinesis authentication via AssumeRole

    ## What is the purpose of the change
    
    Enable `FlinkKinesisProducer` to authenticate via assuming a role.
    
    ### Current situation:
    
    FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials via one of the following mechanisms:
    
    * Environment variables
    * System properties
    * An AWS profile
    * Directly provided credentials (`BASIC`)
    * AWS's own default heuristic (`AUTO`)
    
    For streaming across AWS accounts, it is considered good practise to enable access to the remote Kinesis stream via a role, rather than passing credentials for the remote account.
    
    ### Proposed change:
    
    Add a new credentials provider specifying a role ARN, session name, and an additional credentials provider supplying the credentials for assuming the role.
    
    Config example for assuming role `<role-arn>` with auto-detected credentials:{{}}
    
    ```
    aws.credentials.provider: ASSUME_ROLE
    aws.credentials.provider.role.arn: <role-arn>
    aws.credentials.provider.role.sessionName: my-session-name
    aws.credentials.provider.role.provider: AUTO
    ```
    
    `ASSUME_ROLE` credentials providers can be nested, i.e. it is possible to assume a role which in turn is allowed to assume another role:
    
    ```
    aws.credentials.provider: ASSUME_ROLE
    aws.credentials.provider.role.arn: <role-arn>
    aws.credentials.provider.role.sessionName: my-session-name
    aws.credentials.provider.role.provider: ASSUME_ROLE
    aws.credentials.provider.role.provider.role.arn: <nested-role-arn>
    aws.credentials.provider.role.provider.role.sessionName: my-nested-session-name
    aws.credentials.provider.role.provider.role.provider: AUTO
    ```
    
    ## Brief change log
    
    - Add `aws.credentials.provider` option `ASSUME_ROLE` for authenticating via assuming a role.
    
    ## Verifying this change
    
    The feature changed was not covered by tests, and it is hard to add non-trivial tests. It can be verified manually:
    * Create an AWS IAM user and a role, and give the user permissions to assume the role.
    * Create a Kinesis stream, and give the role permissions to write to this stream, but not the user.
    * Set up the config by passing the user's credentials to assume the role
    * The Kinesis producer should now be able to write to the stream.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): yes
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no, although a config option is added.
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? docs, JavaDocs

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fmthoma/flink enableRoles

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6221.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6221
    
----
commit 45adaa5a14a3db340fce878db50ce6ed8716fe6c
Author: Franz Thoma <fr...@...>
Date:   2018-05-14T13:02:12Z

    [FLINK-9686] [kinesis] Allow creating AWS credentials by assuming a role
    
    Config example:
    
    ```
    aws.credentials.provider: ASSUME_ROLE
    aws.credentials.provider.role.arn: <arn>
    aws.credentials.provider.role.sessionName: session-name
    aws.credentials.provider.role.provider: AUTO
    ```

commit 3189b48a6cac2898f63cbf75105f544cde32ff58
Author: Franz Thoma <fr...@...>
Date:   2018-05-14T13:04:08Z

    [FLINK-9686] [kinesis] Housekeeping: Use early return instead of variable assignment and break

commit 6fe344c8adb9d2c0bed0006216105b0a5032da55
Author: Franz Thoma <fr...@...>
Date:   2018-05-14T13:13:06Z

    [FLINK-9686] [kinesis] Housekeeping

commit eb404061e9c4f87dbba1d18a3300fc190af5ea92
Author: Franz Thoma <fr...@...>
Date:   2018-05-15T13:33:50Z

    [FLINK-9686] [kinesis] Add dependency on aws-java-sdk-sts
    
    Implicitly (via `Class.forName`) used by `STSProfileCredentialsServiceProvider`.
    Due to shading, it is not possible to treat this as a "provided" dependency, as
    Maven rewrites the class name with the shaded one, which would force clients to
    provide aws-java-sdk-sts shaded in the same way.

commit d7ef8b977f379b7178260cbaf7bcdde6b6b3df1a
Author: Franz Thoma <fr...@...>
Date:   2018-06-28T07:42:20Z

    [FLINK-9686] [kinesis] Mention new config option in docs

----


---

[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6221


---

[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6221#discussion_r199420811
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ---
    @@ -45,29 +45,63 @@
     		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
     		BASIC,
     
    +		/** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
    +		ASSUME_ROLE,
    +
     		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
     		AUTO,
     	}
     
     	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
     	public static final String AWS_REGION = "aws.region";
     
    +	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
    +	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    +
     	/** The AWS access key ID to use when setting credentials provider type to BASIC. */
    -	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
    +	public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
     
     	/** The AWS secret key to use when setting credentials provider type to BASIC. */
    -	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
    -
    -	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
    -	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    +	public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
     
     	/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
    -	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
    +	public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
     
     	/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
    -	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
    +	public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
     
     	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
     	public static final String AWS_ENDPOINT = "aws.endpoint";
     
    +	public static String accessKeyId(String prefix) {
    +		return prefix + ".basic.accesskeyid";
    +	}
    +
    +	public static String secretKey(String prefix) {
    +		return prefix + ".basic.secretkey";
    +	}
    +
    +	public static String profilePath(String prefix) {
    +		return prefix + ".profile.path";
    +	}
    +
    +	public static String profileName(String prefix) {
    +		return prefix + ".profile.name";
    +	}
    +
    +	public static String roleArn(String prefix) {
    --- End diff --
    
    Is there a reason to change the way key constants are defined in this class?
    i.e., if the previous pattern was followed, users could just use `AwsConfigConstants.AWS_ROLE_ARN` to set a value for the role ARN, and likewise for the other new configurations.


---

[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6221#discussion_r199462901
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ---
    @@ -45,29 +45,63 @@
     		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
     		BASIC,
     
    +		/** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
    +		ASSUME_ROLE,
    +
     		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
     		AUTO,
     	}
     
     	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
     	public static final String AWS_REGION = "aws.region";
     
    +	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
    +	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    +
     	/** The AWS access key ID to use when setting credentials provider type to BASIC. */
    -	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
    +	public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
     
     	/** The AWS secret key to use when setting credentials provider type to BASIC. */
    -	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
    -
    -	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
    -	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    +	public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
     
     	/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
    -	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
    +	public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
     
     	/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
    -	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
    +	public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
     
     	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
     	public static final String AWS_ENDPOINT = "aws.endpoint";
     
    +	public static String accessKeyId(String prefix) {
    +		return prefix + ".basic.accesskeyid";
    +	}
    +
    +	public static String secretKey(String prefix) {
    +		return prefix + ".basic.secretkey";
    +	}
    +
    +	public static String profilePath(String prefix) {
    +		return prefix + ".profile.path";
    +	}
    +
    +	public static String profileName(String prefix) {
    +		return prefix + ".profile.name";
    +	}
    +
    +	public static String roleArn(String prefix) {
    --- End diff --
    
    The reason is that you can assume a role via another role (via another role...), so the configuration is recursive. So I introduced these methods that build config keys based on some prefix.
    
    But I see your point that users want to use constants to refer to config keys, so I will add some constants for the configuration of the first role:
    * `AWS_ROLE_ARN`
    * `AWS_ROLE_SISSION_NAME`
    * `AWS_ROLE_EXTERNAL_ID`
    * `AWS_ROLE_CREDENTIALS_PROVIDER`


---

[GitHub] flink issue #6221: [FLINK-9686] [kinesis] Enable Kinesis authentication via ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6221
  
    Thanks @fmthoma for addressing @tzulitai comments. Merging this PR now.


---

[GitHub] flink issue #6221: [FLINK-9686] [kinesis] Enable Kinesis authentication via ...

Posted by fmthoma <gi...@git.apache.org>.
Github user fmthoma commented on the issue:

    https://github.com/apache/flink/pull/6221
  
    @tillrohrmann @tzulitai Thank you!


---