You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/02 08:36:00 UTC

[jira] [Commented] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole

    [ https://issues.apache.org/jira/browse/FLINK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529535#comment-16529535 ] 

ASF GitHub Bot commented on FLINK-9686:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6221#discussion_r199420811
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ---
    @@ -45,29 +45,63 @@
     		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
     		BASIC,
     
    +		/** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
    +		ASSUME_ROLE,
    +
     		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
     		AUTO,
     	}
     
     	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
     	public static final String AWS_REGION = "aws.region";
     
    +	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
    +	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    +
     	/** The AWS access key ID to use when setting credentials provider type to BASIC. */
    -	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
    +	public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
     
     	/** The AWS secret key to use when setting credentials provider type to BASIC. */
    -	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
    -
    -	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
    -	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    +	public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
     
     	/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
    -	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
    +	public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
     
     	/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
    -	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
    +	public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
     
     	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
     	public static final String AWS_ENDPOINT = "aws.endpoint";
     
    +	public static String accessKeyId(String prefix) {
    +		return prefix + ".basic.accesskeyid";
    +	}
    +
    +	public static String secretKey(String prefix) {
    +		return prefix + ".basic.secretkey";
    +	}
    +
    +	public static String profilePath(String prefix) {
    +		return prefix + ".profile.path";
    +	}
    +
    +	public static String profileName(String prefix) {
    +		return prefix + ".profile.name";
    +	}
    +
    +	public static String roleArn(String prefix) {
    --- End diff --
    
    Is there a reason to change the way key constants are defined in this class?
    i.e., if the previous pattern was followed, users could just use `AwsConfigConstants.AWS_ROLE_ARN` to set a value for the role ARN, and likewise for the other new configurations.


> Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole
> --------------------------------------------------------------------
>
>                 Key: FLINK-9686
>                 URL: https://issues.apache.org/jira/browse/FLINK-9686
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>            Reporter: Franz Thoma
>            Assignee: Franz Thoma
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Current situation:
> FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials via one of the following mechanisms:
>  * Environment variables
>  * System properties
>  * An AWS profile
>  * Directly provided credentials (\{{BASIC}})
>  * AWS's own default heuristic (\{{AUTO}})
> For streaming across AWS accounts, it is considered good practise to enable access to the remote Kinesis stream via a role, rather than passing credentials for the remote account.
> h2. Proposed change:
> Add a new credentials provider specifying a role ARN, session name, and an additional credentials provider supplying the credentials for assuming the role.
> Config example for assuming role {{<role-arn>}} with auto-detected credentials:{{}}
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: <role-arn>
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: AUTO
> {code}
> {{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to assume a role which in turn is allowed to assume another role:
> {code:java}
> aws.credentials.provider: ASSUME_ROLE
> aws.credentials.provider.role.arn: <role-arn>
> aws.credentials.provider.role.sessionName: my-session-name
> aws.credentials.provider.role.provider: ASSUME_ROLE
> aws.credentials.provider.role.provider.role.arn: <nested-role-arn>
> aws.credentials.provider.role.provider.role.sessionName: my-nested-session-name
> aws.credentials.provider.role.provider.role.provider: AUTO
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)