You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Danny Cranmer (Jira)" <ji...@apache.org> on 2020/12/16 10:58:00 UTC

[jira] [Comment Edited] (FLINK-20602) Getting exception Connecting kinesis with mfa

    [ https://issues.apache.org/jira/browse/FLINK-20602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250248#comment-17250248 ] 

Danny Cranmer edited comment on FLINK-20602 at 12/16/20, 10:57 AM:
-------------------------------------------------------------------

I have reproduced and found a solution. The issue is not {{MFA}}, it is the {{BASIC}} credential provider is not using the token:
- https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181

If you want to supply AK/SK/Token then you will have to use another {{CredentialProviderType}}, below is an example using {{SYS_PROP}}. We could improve the Kinesis connector to detect the session token and construct a {{BasicSessionCredentials}}:
- https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java

{code:java}
Properties systemProperties = System.getProperties();
systemProperties.setProperty("aws.accessKeyId", accessKey);
systemProperties.setProperty("aws.secretKey", secretKey);
systemProperties.setProperty("aws.sessionToken", seesionToken);

Properties producerConfig = new Properties();
producerConfig.setProperty(AWSConfigConstants.AWS_REGION, REGION);
producerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
{code}



was (Author: danny.cranmer):
I have reproduced and found a solution. The issue is not {{MFA}}, it is the {{BASIC}} credential provider is not using the token:
- https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181

If you want to supply AK/SK/Token then you will have to use another {{CredentialProviderType}}, below is an example using {{SYS_PROP}}. We could improve the Kinesis connector to detect the session token and construct a {{BasicSessionCredentials}}:
https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java

{code:java}
Properties systemProperties = System.getProperties();
systemProperties.setProperty("aws.accessKeyId", accessKey);
systemProperties.setProperty("aws.secretKey", secretKey);
systemProperties.setProperty("aws.sessionToken", seesionToken);

Properties producerConfig = new Properties();
producerConfig.setProperty(AWSConfigConstants.AWS_REGION, REGION);
producerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
{code}


> Getting exception Connecting kinesis with mfa 
> ----------------------------------------------
>
>                 Key: FLINK-20602
>                 URL: https://issues.apache.org/jira/browse/FLINK-20602
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>            Reporter: Avi Levi
>            Priority: Major
>
> I am trying to connect to kinesis (running with mfa) with the following configuration :
>  
> {code:java}
> val producerConfig = new Properties()
> producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
> producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)
> {code}
> with a very simple pipeline :
>  
> {code:java}
> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
> producer.setFailOnError(true)
> producer.setDefaultStream(outputStreamName)
> producer.setDefaultPartition("0")
> env.fromElements("a", "b", "c").addSink(producer)
> env.execute()
> {code}
> the results with:
> {code:java}
> 15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
> 15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
> 15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
> 15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
> Exception name: UnrecognizedClientException
> Error message: The security token included in the request is invalid.
> 6 response headers:
> connection : close{code}
> I double check that all keys are correct using the same keys that works perfectly when i execute commands from the cli. also when removing the mfa from kinesis the pipeline works as expected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)