You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Avi Levi <av...@neosec.com> on 2020/12/15 09:06:41 UTC

Connecting to kinesis with mfa

Hi guys,
we are struggling to connect to kinesis when mfa is activated. I did
configured everything according to the documentation but still getting
exception :


val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN,
awsSessionToken)

with a very simple pipeline :



val producer = new FlinkKinesisProducer(new SimpleStringSchema(),
producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000]
[warning] [AWS Log: WARN](AWSClient)If the signature check failed.
This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info]
[shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000]
[warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError
'UnrecognizedClientException': The security token included in the
request is invalid.
15:30:44,396 ERROR
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000]
[error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work
perfectly when I execute commands from the cli. also when removing the mfa
from kinesis the pipeline works as expected. finally i did open a ticket
<https://issues.apache.org/jira/browse/FLINK-20602> for that also .

Re: Connecting to kinesis with mfa

Posted by Avi Levi <av...@neosec.com>.
Awesome, thanks! looks good


On Wed, Dec 16, 2020 at 12:55 PM Cranmer, Danny <cr...@amazon.com> wrote:

> Hey Avi,
>
>
>
> I have reproduced and found a solution. The issue is not MFA, it is the
> BASIC credential provider is not using the token:
>
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181
>
>
>
> If you want to supply AK/SK/Token then you will have to use another
> CredentialProviderType, below is an example using SYS_PROP. We could
> improve the Kinesis connector to detect the session token and construct a
> BasicSessionCredentials:
>
>
> https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java
>
>
>
> Properties systemProperties = System.*getProperties*();
> systemProperties.setProperty("aws.accessKeyId", accessKey);
> systemProperties.setProperty("aws.secretKey", secretKey);
> systemProperties.setProperty("aws.sessionToken", seesionToken);
>
> Properties producerConfig = new Properties();
> producerConfig.setProperty(AWSConfigConstants.*AWS_REGION*, *REGION*);
> producerConfig.setProperty(AWSConfigConstants.*AWS_CREDENTIALS_PROVIDER*,
> "SYS_PROP");
>
>
>
> I will add this to the Jira also. Let me know if you have any issues.
>
>
>
> Thanks,
>
> Danny
>
>
>
> *From: *Avi Levi <av...@neosec.com>
> *Date: *Wednesday, 16 December 2020 at 08:09
> *To: *Robert Metzger <rm...@apache.org>
> *Cc: *user <us...@flink.apache.org>
> *Subject: *RE: [EXTERNAL] Connecting to kinesis with mfa
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Thanks Robert, I actually tried all of the above but got to the same
> unfortunate result
>
>
>
> On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger <rm...@apache.org>
> wrote:
>
> Hey Avi,
>
>
>
> Maybe providing secret/access key + session token doesn't work, and you
> need to provide either one of them?
>
>
> https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html
>
>
>
> I'll also ping some AWS contributors active in Flink to take a look at
> this.
>
>
>
> Best,
>
> Robert
>
>
>
> On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <av...@neosec.com> wrote:
>
> Hi guys,
>
> we are struggling to connect to kinesis when mfa is activated. I did
> configured everything according to the documentation but still getting
> exception :
>
>
> val producerConfig = new Properties()
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
>
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
>
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
>
> producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)
>
> with a very simple pipeline :
>
>
>
> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
>
> producer.setFailOnError(true)
>
> producer.setDefaultStream(outputStreamName)
>
> producer.setDefaultPartition("0")
>
> env.fromElements("a", "b", "c").addSink(producer)
>
> env.execute()
>
> the results with:
>
> 15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
>
> 15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
>
> 15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
>
> 15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
>
> Exception name: UnrecognizedClientException
>
> Error message: The security token included in the request is invalid.
>
> 6 response headers:
>
> connection : close
>
> I double check that all keys are correct using the same keys that work
> perfectly when I execute commands from the cli. also when removing the mfa
> from kinesis the pipeline works as expected. finally i did open a ticket
> <https://issues.apache.org/jira/browse/FLINK-20602> for that also .
>
>

Re: Connecting to kinesis with mfa

Posted by "Cranmer, Danny" <cr...@amazon.com>.
Hey Avi,

I have reproduced and found a solution. The issue is not MFA, it is the BASIC credential provider is not using the token:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181

If you want to supply AK/SK/Token then you will have to use another CredentialProviderType, below is an example using SYS_PROP. We could improve the Kinesis connector to detect the session token and construct a BasicSessionCredentials:
https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java

Properties systemProperties = System.getProperties();
systemProperties.setProperty("aws.accessKeyId", accessKey);
systemProperties.setProperty("aws.secretKey", secretKey);
systemProperties.setProperty("aws.sessionToken", seesionToken);

Properties producerConfig = new Properties();
producerConfig.setProperty(AWSConfigConstants.AWS_REGION, REGION);
producerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "SYS_PROP");

I will add this to the Jira also. Let me know if you have any issues.

Thanks,
Danny

From: Avi Levi <av...@neosec.com>
Date: Wednesday, 16 December 2020 at 08:09
To: Robert Metzger <rm...@apache.org>
Cc: user <us...@flink.apache.org>
Subject: RE: [EXTERNAL] Connecting to kinesis with mfa


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Thanks Robert, I actually tried all of the above but got to the same unfortunate result

On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger <rm...@apache.org>> wrote:
Hey Avi,

Maybe providing secret/access key + session token doesn't work, and you need to provide either one of them?
https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html

I'll also ping some AWS contributors active in Flink to take a look at this.

Best,
Robert

On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <av...@neosec.com>> wrote:
Hi guys,
we are struggling to connect to kinesis when mfa is activated. I did configured everything according to the documentation but still getting exception :

val producerConfig = new Properties()

producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)

producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)

producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)

producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)

with a very simple pipeline :



val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)

producer.setFailOnError(true)

producer.setDefaultStream(outputStreamName)

producer.setDefaultPartition("0")

env.fromElements("a", "b", "c").addSink(producer)

env.execute()

the results with:

15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.

15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"

15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.

15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400

Exception name: UnrecognizedClientException

Error message: The security token included in the request is invalid.

6 response headers:

connection : close

I double check that all keys are correct using the same keys that work perfectly when I execute commands from the cli. also when removing the mfa from kinesis the pipeline works as expected. finally i did open a ticket<https://issues.apache.org/jira/browse/FLINK-20602> for that also .

Re: Connecting to kinesis with mfa

Posted by Avi Levi <av...@neosec.com>.
Thanks Robert, I actually tried all of the above but got to the same
unfortunate result

On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger <rm...@apache.org> wrote:

> Hey Avi,
>
> Maybe providing secret/access key + session token doesn't work, and you
> need to provide either one of them?
>
> https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html
>
> I'll also ping some AWS contributors active in Flink to take a look at
> this.
>
> Best,
> Robert
>
> On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <av...@neosec.com> wrote:
>
>> Hi guys,
>> we are struggling to connect to kinesis when mfa is activated. I did
>> configured everything according to the documentation but still getting
>> exception :
>>
>>
>> val producerConfig = new Properties()
>> producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
>> producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)
>>
>> with a very simple pipeline :
>>
>>
>>
>> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
>> producer.setFailOnError(true)
>> producer.setDefaultStream(outputStreamName)
>> producer.setDefaultPartition("0")
>> env.fromElements("a", "b", "c").addSink(producer)
>> env.execute()
>>
>> the results with:
>>
>> 15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
>> 15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
>> 15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
>> 15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
>> Exception name: UnrecognizedClientException
>> Error message: The security token included in the request is invalid.
>> 6 response headers:
>> connection : close
>>
>> I double check that all keys are correct using the same keys that work
>> perfectly when I execute commands from the cli. also when removing the mfa
>> from kinesis the pipeline works as expected. finally i did open a ticket
>> <https://issues.apache.org/jira/browse/FLINK-20602> for that also .
>>
>

Re: Connecting to kinesis with mfa

Posted by Robert Metzger <rm...@apache.org>.
Hey Avi,

Maybe providing secret/access key + session token doesn't work, and you
need to provide either one of them?
https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html

I'll also ping some AWS contributors active in Flink to take a look at this.

Best,
Robert

On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <av...@neosec.com> wrote:

> Hi guys,
> we are struggling to connect to kinesis when mfa is activated. I did
> configured everything according to the documentation but still getting
> exception :
>
>
> val producerConfig = new Properties()
> producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
> producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)
>
> with a very simple pipeline :
>
>
>
> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
> producer.setFailOnError(true)
> producer.setDefaultStream(outputStreamName)
> producer.setDefaultPartition("0")
> env.fromElements("a", "b", "c").addSink(producer)
> env.execute()
>
> the results with:
>
> 15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
> 15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
> 15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
> 15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
> Exception name: UnrecognizedClientException
> Error message: The security token included in the request is invalid.
> 6 response headers:
> connection : close
>
> I double check that all keys are correct using the same keys that work
> perfectly when I execute commands from the cli. also when removing the mfa
> from kinesis the pipeline works as expected. finally i did open a ticket
> <https://issues.apache.org/jira/browse/FLINK-20602> for that also .
>