You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anuj Jain <an...@gmail.com> on 2023/05/08 05:55:15 UTC

Encryption of parameters in flink-conf.yaml

Hi Community,
I am trying to create an amazon S3 filesystem distributor using flink and
for this I am using hadoop S3a connector with Flink filesystem sink.
My flink application would run in a non-AWS environment, on native cluster;
so I need to put my access keys in flink configuration.

For connecting to S3 storage, i am configuring flink-conf.yaml with the
access credentials like
s3.access.key: <access key>
s3.secret.key: <secret key>
... and some other parameters required for assuming AWS IAM role with s3a
AssumedRoleCredentialProvider

Is there a way to encrypt these parameters rather than putting them
directly or is there any other way to supply them programmatically.

I tried to set them programmatically using the Configuration object and
supplying them with
StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
job (rather than from flink-conf.yaml) but then the S3 connection failed. I
think flink creates the connection pool at startup even before the job is
started.

Thanks and Regards
Anuj Jain

Re: Encryption of parameters in flink-conf.yaml

Posted by Anuj Jain <an...@gmail.com>.
Hi,

Thanks for the reply.

I don't think I can use IAM integration and avoid distributing keys to the
application because my Flink application is running outside AWS EC2, in
native K8s cluster nodes, from where I am distributing to S3 services
hosted on AWS.
If there is a procedure to still integrate with IAM, please point me to
some documentation.

Let me try if using environment variables passes our security checks.
I am also trying to see if hadoop credential providers can work with Flink
S3a file sink.
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Storing_secrets_with_Hadoop_Credential_Providers

Thanks!!

Regards
Anuj

On Tue, May 9, 2023 at 6:23 PM Gabor Somogyi <ga...@gmail.com>
wrote:

> hi Anuj,
>
> As Martijn said IAM is the preferred option but if you've no other way
> than access keys then environment variables is a better choice.
> Such case conf doesn't contain plain text keys.
>
> Just a side note, putting `s3a.access.key` into Flink conf file is not
> configuring Hadoop S3. The way how it goes is to set
> `flink.hadoop.s3a.access.key`.
> Practically all configs must be prefixed w/ `flink.hadoop.` to notify
> Flink that these must be forwarded to Hadoop.
>
> G
>
>
> On Tue, May 9, 2023 at 1:50 PM Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi Anuj,
>>
>> You can't provide the values for S3 in job code, since the S3 filesystems
>> are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
>> recommended method for setting up credentials is by using IAM, not via
>> Access Keys. See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>> for more details.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, May 9, 2023 at 1:35 PM Anuj Jain <an...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the reply.
>>>
>>> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
>>> If i understood correctly, even with init-container the flink-conf.yaml
>>> (inside the container) would finally contain unencrypted values for access
>>> tokens. We don't want to persist such sensitive data unencrypted even
>>> inside running containers in files or config maps, due to some security
>>> constraints in my project.
>>> Can you please let me know if I missed something with the suggested
>>> solution.
>>>
>>> Problem with overriding configuration programmatically:
>>> When I removed the S3 properties from flink-conf.yaml and tried to
>>> provide it programmatically from the job code, the connection to S3 failed.
>>> I tried it with Application mode also on a standalone cluster but the
>>> behavior is the same.
>>>
>>> //My job main method (with default flink-conf.yaml):
>>> Configuration conf = new Configuration();
>>> conf.setString("s3a.access.key", <access-key>);
>>> conf.setString("s3a.secret.key", <secret-key>);
>>> conf.setString("s3a.aws.credentials.provider",
>>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>>> conf.setString("s3a.assumed.role.arn", <role arn>);
>>> conf.setString("s3a.assumed.role.session.name", <session name>);
>>> conf.setString("s3a.assumed.role.session.duration", <refresh time>);
>>> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
>>> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>>>
>>> // flink job program using DataStream
>>>
>>> env.execute("My job");
>>>
>>> With this i got connection exception
>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>> com.amazonaws.SdkClientException: Unable to load AWS credentials from
>>> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
>>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
>>> at
>>> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
>>> ~[?:?]
>>> at
>>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
>>> ~[?:?]
>>>
>>> When these values are given in flink-conf.yaml instead of job code, then
>>> connection was successful. Please guide if i am doing something incorrect
>>> w.r.t the job program.
>>>
>>> Regards
>>> Anuj
>>>
>>> On Mon, May 8, 2023 at 12:36 PM Biao Geng <bi...@gmail.com> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> To my best knowledge, flink does not provide the encryption strategy
>>>> support for now. If you are using flink on k8s, it is possible to achieve
>>>> the encryption of parameters using the init container. You can check this
>>>> SO
>>>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault> for
>>>> more detailed instructions.
>>>> Besides, it should be possible to override Configuration object in your
>>>> job code. Are you using Application mode to run the job?
>>>>
>>>> Best regards,
>>>> Biao Geng
>>>>
>>>> Anuj Jain <an...@gmail.com> 于2023年5月8日周一 13:55写道:
>>>>
>>>>> Hi Community,
>>>>> I am trying to create an amazon S3 filesystem distributor using flink
>>>>> and for this I am using hadoop S3a connector with Flink filesystem sink.
>>>>> My flink application would run in a non-AWS environment, on native
>>>>> cluster; so I need to put my access keys in flink configuration.
>>>>>
>>>>> For connecting to S3 storage, i am configuring flink-conf.yaml
>>>>> with the access credentials like
>>>>> s3.access.key: <access key>
>>>>> s3.secret.key: <secret key>
>>>>> ... and some other parameters required for assuming AWS IAM role with
>>>>> s3a AssumedRoleCredentialProvider
>>>>>
>>>>> Is there a way to encrypt these parameters rather than putting them
>>>>> directly or is there any other way to supply them programmatically.
>>>>>
>>>>> I tried to set them programmatically using the Configuration object
>>>>> and supplying them with
>>>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>>>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>>>>> think flink creates the connection pool at startup even before the job is
>>>>> started.
>>>>>
>>>>> Thanks and Regards
>>>>> Anuj Jain
>>>>>
>>>>

Re: Encryption of parameters in flink-conf.yaml

Posted by Gabor Somogyi <ga...@gmail.com>.
hi Anuj,

As Martijn said IAM is the preferred option but if you've no other way than
access keys then environment variables is a better choice.
Such case conf doesn't contain plain text keys.

Just a side note, putting `s3a.access.key` into Flink conf file is not
configuring Hadoop S3. The way how it goes is to set
`flink.hadoop.s3a.access.key`.
Practically all configs must be prefixed w/ `flink.hadoop.` to notify Flink
that these must be forwarded to Hadoop.

G


On Tue, May 9, 2023 at 1:50 PM Martijn Visser <ma...@apache.org>
wrote:

> Hi Anuj,
>
> You can't provide the values for S3 in job code, since the S3 filesystems
> are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
> recommended method for setting up credentials is by using IAM, not via
> Access Keys. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
> for more details.
>
> Best regards,
>
> Martijn
>
> On Tue, May 9, 2023 at 1:35 PM Anuj Jain <an...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for the reply.
>>
>> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
>> If i understood correctly, even with init-container the flink-conf.yaml
>> (inside the container) would finally contain unencrypted values for access
>> tokens. We don't want to persist such sensitive data unencrypted even
>> inside running containers in files or config maps, due to some security
>> constraints in my project.
>> Can you please let me know if I missed something with the suggested
>> solution.
>>
>> Problem with overriding configuration programmatically:
>> When I removed the S3 properties from flink-conf.yaml and tried to
>> provide it programmatically from the job code, the connection to S3 failed.
>> I tried it with Application mode also on a standalone cluster but the
>> behavior is the same.
>>
>> //My job main method (with default flink-conf.yaml):
>> Configuration conf = new Configuration();
>> conf.setString("s3a.access.key", <access-key>);
>> conf.setString("s3a.secret.key", <secret-key>);
>> conf.setString("s3a.aws.credentials.provider",
>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>> conf.setString("s3a.assumed.role.arn", <role arn>);
>> conf.setString("s3a.assumed.role.session.name", <session name>);
>> conf.setString("s3a.assumed.role.session.duration", <refresh time>);
>> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
>> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>>
>> // flink job program using DataStream
>>
>> env.execute("My job");
>>
>> With this i got connection exception
>> Caused by: org.apache.flink.util.SerializedThrowable:
>> com.amazonaws.SdkClientException: Unable to load AWS credentials from
>> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
>> at
>> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
>> ~[?:?]
>> at
>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
>> ~[?:?]
>>
>> When these values are given in flink-conf.yaml instead of job code, then
>> connection was successful. Please guide if i am doing something incorrect
>> w.r.t the job program.
>>
>> Regards
>> Anuj
>>
>> On Mon, May 8, 2023 at 12:36 PM Biao Geng <bi...@gmail.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> To my best knowledge, flink does not provide the encryption strategy
>>> support for now. If you are using flink on k8s, it is possible to achieve
>>> the encryption of parameters using the init container. You can check this
>>> SO
>>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault> for
>>> more detailed instructions.
>>> Besides, it should be possible to override Configuration object in your
>>> job code. Are you using Application mode to run the job?
>>>
>>> Best regards,
>>> Biao Geng
>>>
>>> Anuj Jain <an...@gmail.com> 于2023年5月8日周一 13:55写道:
>>>
>>>> Hi Community,
>>>> I am trying to create an amazon S3 filesystem distributor using flink
>>>> and for this I am using hadoop S3a connector with Flink filesystem sink.
>>>> My flink application would run in a non-AWS environment, on native
>>>> cluster; so I need to put my access keys in flink configuration.
>>>>
>>>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>>>> access credentials like
>>>> s3.access.key: <access key>
>>>> s3.secret.key: <secret key>
>>>> ... and some other parameters required for assuming AWS IAM role with
>>>> s3a AssumedRoleCredentialProvider
>>>>
>>>> Is there a way to encrypt these parameters rather than putting them
>>>> directly or is there any other way to supply them programmatically.
>>>>
>>>> I tried to set them programmatically using the Configuration object and
>>>> supplying them with
>>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>>>> think flink creates the connection pool at startup even before the job is
>>>> started.
>>>>
>>>> Thanks and Regards
>>>> Anuj Jain
>>>>
>>>

Re: Encryption of parameters in flink-conf.yaml

Posted by Martijn Visser <ma...@apache.org>.
Hi Anuj,

You can't provide the values for S3 in job code, since the S3 filesystems
are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
recommended method for setting up credentials is by using IAM, not via
Access Keys. See
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
for more details.

Best regards,

Martijn

On Tue, May 9, 2023 at 1:35 PM Anuj Jain <an...@gmail.com> wrote:

> Hi,
>
> Thanks for the reply.
>
> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
> If i understood correctly, even with init-container the flink-conf.yaml
> (inside the container) would finally contain unencrypted values for access
> tokens. We don't want to persist such sensitive data unencrypted even
> inside running containers in files or config maps, due to some security
> constraints in my project.
> Can you please let me know if I missed something with the suggested
> solution.
>
> Problem with overriding configuration programmatically:
> When I removed the S3 properties from flink-conf.yaml and tried to provide
> it programmatically from the job code, the connection to S3 failed.
> I tried it with Application mode also on a standalone cluster but the
> behavior is the same.
>
> //My job main method (with default flink-conf.yaml):
> Configuration conf = new Configuration();
> conf.setString("s3a.access.key", <access-key>);
> conf.setString("s3a.secret.key", <secret-key>);
> conf.setString("s3a.aws.credentials.provider",
> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
> conf.setString("s3a.assumed.role.arn", <role arn>);
> conf.setString("s3a.assumed.role.session.name", <session name>);
> conf.setString("s3a.assumed.role.session.duration", <refresh time>);
> conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
> conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>
> // flink job program using DataStream
>
> env.execute("My job");
>
> With this i got connection exception
> Caused by: org.apache.flink.util.SerializedThrowable:
> com.amazonaws.SdkClientException: Unable to load AWS credentials from
> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
> at
> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
> ~[?:?]
> at
> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
> ~[?:?]
>
> When these values are given in flink-conf.yaml instead of job code, then
> connection was successful. Please guide if i am doing something incorrect
> w.r.t the job program.
>
> Regards
> Anuj
>
> On Mon, May 8, 2023 at 12:36 PM Biao Geng <bi...@gmail.com> wrote:
>
>> Hi Anuj,
>>
>> To my best knowledge, flink does not provide the encryption strategy
>> support for now. If you are using flink on k8s, it is possible to achieve
>> the encryption of parameters using the init container. You can check this
>> SO
>> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault> for
>> more detailed instructions.
>> Besides, it should be possible to override Configuration object in your
>> job code. Are you using Application mode to run the job?
>>
>> Best regards,
>> Biao Geng
>>
>> Anuj Jain <an...@gmail.com> 于2023年5月8日周一 13:55写道:
>>
>>> Hi Community,
>>> I am trying to create an amazon S3 filesystem distributor using flink
>>> and for this I am using hadoop S3a connector with Flink filesystem sink.
>>> My flink application would run in a non-AWS environment, on native
>>> cluster; so I need to put my access keys in flink configuration.
>>>
>>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>>> access credentials like
>>> s3.access.key: <access key>
>>> s3.secret.key: <secret key>
>>> ... and some other parameters required for assuming AWS IAM role with
>>> s3a AssumedRoleCredentialProvider
>>>
>>> Is there a way to encrypt these parameters rather than putting them
>>> directly or is there any other way to supply them programmatically.
>>>
>>> I tried to set them programmatically using the Configuration object and
>>> supplying them with
>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>>> think flink creates the connection pool at startup even before the job is
>>> started.
>>>
>>> Thanks and Regards
>>> Anuj Jain
>>>
>>

Re: Encryption of parameters in flink-conf.yaml

Posted by Anuj Jain <an...@gmail.com>.
Hi,

Thanks for the reply.

Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
If i understood correctly, even with init-container the flink-conf.yaml
(inside the container) would finally contain unencrypted values for access
tokens. We don't want to persist such sensitive data unencrypted even
inside running containers in files or config maps, due to some security
constraints in my project.
Can you please let me know if I missed something with the suggested
solution.

Problem with overriding configuration programmatically:
When I removed the S3 properties from flink-conf.yaml and tried to provide
it programmatically from the job code, the connection to S3 failed.
I tried it with Application mode also on a standalone cluster but the
behavior is the same.

//My job main method (with default flink-conf.yaml):
Configuration conf = new Configuration();
conf.setString("s3a.access.key", <access-key>);
conf.setString("s3a.secret.key", <secret-key>);
conf.setString("s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
conf.setString("s3a.assumed.role.arn", <role arn>);
conf.setString("s3a.assumed.role.session.name", <session name>);
conf.setString("s3a.assumed.role.session.duration", <refresh time>);
conf.setString("s3a.assumed.role.sts.endpoint", <endpoint>);
conf.setString("s3a.assumed.role.sts.endpoint.region", <region>);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);

// flink job program using DataStream

env.execute("My job");

With this i got connection exception
Caused by: org.apache.flink.util.SerializedThrowable:
com.amazonaws.SdkClientException: Unable to load AWS credentials from
environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
at
com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
~[?:?]
at
org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
~[?:?]

When these values are given in flink-conf.yaml instead of job code, then
connection was successful. Please guide if i am doing something incorrect
w.r.t the job program.

Regards
Anuj

On Mon, May 8, 2023 at 12:36 PM Biao Geng <bi...@gmail.com> wrote:

> Hi Anuj,
>
> To my best knowledge, flink does not provide the encryption strategy
> support for now. If you are using flink on k8s, it is possible to achieve
> the encryption of parameters using the init container. You can check this
> SO
> <https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault> for
> more detailed instructions.
> Besides, it should be possible to override Configuration object in your
> job code. Are you using Application mode to run the job?
>
> Best regards,
> Biao Geng
>
> Anuj Jain <an...@gmail.com> 于2023年5月8日周一 13:55写道:
>
>> Hi Community,
>> I am trying to create an amazon S3 filesystem distributor using flink and
>> for this I am using hadoop S3a connector with Flink filesystem sink.
>> My flink application would run in a non-AWS environment, on native
>> cluster; so I need to put my access keys in flink configuration.
>>
>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>> access credentials like
>> s3.access.key: <access key>
>> s3.secret.key: <secret key>
>> ... and some other parameters required for assuming AWS IAM role with s3a
>> AssumedRoleCredentialProvider
>>
>> Is there a way to encrypt these parameters rather than putting them
>> directly or is there any other way to supply them programmatically.
>>
>> I tried to set them programmatically using the Configuration object and
>> supplying them with
>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>> think flink creates the connection pool at startup even before the job is
>> started.
>>
>> Thanks and Regards
>> Anuj Jain
>>
>

Re: Encryption of parameters in flink-conf.yaml

Posted by Biao Geng <bi...@gmail.com>.
Hi Anuj,

To my best knowledge, flink does not provide the encryption strategy
support for now. If you are using flink on k8s, it is possible to achieve
the encryption of parameters using the init container. You can check this SO
<https://stackoverflow.com/questions/73579176/flink-kubernetes-deployment-how-to-provide-s3-credentials-from-hashicorp-vault>
for
more detailed instructions.
Besides, it should be possible to override Configuration object in your job
code. Are you using Application mode to run the job?

Best regards,
Biao Geng

Anuj Jain <an...@gmail.com> 于2023年5月8日周一 13:55写道:

> Hi Community,
> I am trying to create an amazon S3 filesystem distributor using flink and
> for this I am using hadoop S3a connector with Flink filesystem sink.
> My flink application would run in a non-AWS environment, on native
> cluster; so I need to put my access keys in flink configuration.
>
> For connecting to S3 storage, i am configuring flink-conf.yaml with the
> access credentials like
> s3.access.key: <access key>
> s3.secret.key: <secret key>
> ... and some other parameters required for assuming AWS IAM role with s3a
> AssumedRoleCredentialProvider
>
> Is there a way to encrypt these parameters rather than putting them
> directly or is there any other way to supply them programmatically.
>
> I tried to set them programmatically using the Configuration object and
> supplying them with
> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
> think flink creates the connection pool at startup even before the job is
> started.
>
> Thanks and Regards
> Anuj Jain
>