You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2020/02/27 20:55:44 UTC

[Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Hi,

I have a problem with accessing my own S3 system from within Flink when
running on Kubernetes.

*TL;DR* I have my own S3 (Ceph), Locally my application works, when running
in K8s it fails with

Caused by: com.amazonaws.SdkClientException: Unable to load credentials
from service endpoint
Caused by: java.net.SocketException: Network is unreachable (connect failed)


I have my own Kubernetes cluster (1.17) on which I have install Ceph and
the S3 gateway that is included in there.
I have put a file on this 'S3' and in my Flink 1.10.0 application I do this:

StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();

final Configuration conf = new Configuration();

conf.setString("presto.s3.endpoint",         "s3.example.nl");

conf.setString("presto.s3.access-key",       "myAccessKey");

conf.setString("presto.s3.secret-key",       "mySecretKey");

FileSystem.initialize(conf, null);

senv.setParallelism(2);

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<String> rawInputStream = senv

    .readTextFile(path).name("Read input");

...


The s3.example.nl is the hostname of the ingress I have attached to the S3
endpoint. In my case it is accessible via both http and https (with a valid
LetsEncrypt certificate).

When I run this locally from within IntelliJ it works like a charm, reads
the data, does some stuff with it and then writes it to ElasticSearch.

I have created an additional layer to enable the fs-s3-presto plugin with
this Dockerfile.


FROM flink:1.10.0-scala_2.12
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
/opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto


I run flink with this customized docker image like this


#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=8 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto


I then submit this into Kubernetes with this command

flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100
target/flink-table-esloader-0.1-SNAPSHOT.jar


The job starts and after about 40 seconds the job fails with this exception:

*Caused by: com.amazonaws.SdkClientException: Unable to load credentials
from service endpoint*
at
com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at
com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at
com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at
com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at
com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
*Caused by: java.net.SocketException: Network is unreachable (connect
failed)*
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at
com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at
com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at
com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at
com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at
com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 28 more


I have tried this with

conf.setString("presto.s3.endpoint",         "s3.example.nl");

and also by using the ClusterIP and the LoadBalancer IP and I get the same
error in all cases.

I have verified by logging in into the task manager pod that all of these
endpoints show a sensible result when simply doing a curl from the
commandline.

I have the s3cmd installed locally on my laptop.
My ~/.s3cfg looks like this and I can fully access this S3 setup.


[default]
access_key = myAccessKey
secret_key = mySecretKey
host_base = s3.example.nl


*I'm stuck, please help:*

   - What is causing the differences in behaviour between local and in k8s?
   It works locally but not in the cluster.
   - How do I figure out what network it is trying to reach in k8s?


Thanks.
-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Posted by Yang Wang <da...@gmail.com>.
Hi Niels,

You are right. The S3 related configurations you have set in your `main()`
is only
applicable in the client side. Since the filesystem is initialized in the
entrypoint of
JM/TM for only once. AFAIK, we could not provide different credentials for
each
job in the same session cluster.

Best,
Yang

Niels Basjes <Ni...@basjes.nl> 于2020年2月28日周五 下午11:09写道:

> Hi,
>
> As I mentioned in my original email I already verified that the endpoints
> were accessible from the pods, that was not the problem.
>
> It took me a while but I've figured out what went wrong.
>
> Setting the configuration like I did
>
> final Configuration conf = new Configuration();
> conf.setString("presto.s3.endpoint",         "s3.example.nl");conf.setString("presto.s3.access-key",       "myAccessKey");conf.setString("presto.s3.secret-key",       "mySecretKey");FileSystem.initialize(conf, null);
>
> sets it in some static variables that do not get serialized and shipped
> into the task managers.
>
> As a consequence, under the absence of credentials the AWS/S3 client
> assumes it is running inside AWS and that it can retrieve the credentials
> from http://169.254.170.2  (which is non routable)
> Because this is not AWS it cannot do this and I get the error it cannot
> connect.
>
> For now my solution is to start the Flink Session with this
> #!/bin/bash
> ./flink-1.10.0/bin/kubernetes-session.sh \
>   -Dkubernetes.cluster-id=flink1100 \
>   -Dtaskmanager.memory.process.size=8192m \
>   -Dkubernetes.taskmanager.cpu=2 \
>   -Dtaskmanager.numberOfTaskSlots=4 \
>   -Dresourcemanager.taskmanager-timeout=3600000 \
>   -Dkubernetes.container.image=
> docker.example.nl/flink:1.10.0-2.12-s3-presto \
>   -Dpresto.s3.endpoint=s3.example.nl \
>   -Dpresto.s3.access-key=MyAccessKey \
>   -Dpresto.s3.secret-key=MySecretKey \
>   -Dpresto.s3.path.style.access=true
>
> I dislike this because now ALL jobs in this Flink cluster have the same
> credentials.
>
> Is there a way to set the S3 credentials on a per job or even per
> connection basis?
>
> Niels Basjes
>
>
> On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi Niels,
>>
>> Glad to hear that you are trying Flink native K8s integration and share
>> you feedback.
>>
>> What is causing the differences in behavior between local and in k8s? It
>>> works locally but not in the cluster.
>>
>>
>> In your case, the job could be executed successfully local. That means S3
>> endpoint could be accessed in
>> your local network environment. When you submit the job to the K8s
>> cluster, the user `main()` will be executed
>> locally and get the job graph. Then it will be submitted to the cluster
>> for the execution. S3 endpoint will be
>> accessed under the K8s network. So maybe there is something wrong with
>> the network between taskmanager
>> and S3 endpoint.
>>
>> How do I figure out what network it is trying to reach in k8s?
>>
>>
>> I am not an expert of S3. So i am not sure whether the SDK will fetch the
>> credentials from S3 endpoint. If it is,
>> i think you need to find out which taskmanager the source operator is
>> running on. Then exec into the Pod and
>> use nslookup/curl to make sure the endpoint "s3.example.nl" could be
>> resolved and accessed successfully.
>>
>>
>>
>> Best,
>> Yang
>>
>>
>> Niels Basjes <Ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道:
>>
>>> Hi,
>>>
>>> I have a problem with accessing my own S3 system from within Flink when
>>> running on Kubernetes.
>>>
>>> *TL;DR* I have my own S3 (Ceph), Locally my application works, when
>>> running in K8s it fails with
>>>
>>> Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>>> from service endpoint
>>> Caused by: java.net.SocketException: Network is unreachable (connect
>>> failed)
>>>
>>>
>>> I have my own Kubernetes cluster (1.17) on which I have install Ceph and
>>> the S3 gateway that is included in there.
>>> I have put a file on this 'S3' and in my Flink 1.10.0 application I do
>>> this:
>>>
>>> StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> final Configuration conf = new Configuration();
>>>
>>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>>
>>> conf.setString("presto.s3.access-key",       "myAccessKey");
>>>
>>> conf.setString("presto.s3.secret-key",       "mySecretKey");
>>>
>>> FileSystem.initialize(conf, null);
>>>
>>> senv.setParallelism(2);
>>>
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> DataStream<String> rawInputStream = senv
>>>
>>>     .readTextFile(path).name("Read input");
>>>
>>> ...
>>>
>>>
>>> The s3.example.nl is the hostname of the ingress I have attached to the
>>> S3 endpoint. In my case it is accessible via both http and https (with a
>>> valid LetsEncrypt certificate).
>>>
>>> When I run this locally from within IntelliJ it works like a charm,
>>> reads the data, does some stuff with it and then writes it to ElasticSearch.
>>>
>>> I have created an additional layer to enable the fs-s3-presto plugin
>>> with this Dockerfile.
>>>
>>>
>>> FROM flink:1.10.0-scala_2.12
>>> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
>>> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto
>>>
>>>
>>> I run flink with this customized docker image like this
>>>
>>>
>>> #!/bin/bash
>>> ./flink-1.10.0/bin/kubernetes-session.sh \
>>>   -Dkubernetes.cluster-id=flink1100 \
>>>   -Dtaskmanager.memory.process.size=8192m \
>>>   -Dkubernetes.taskmanager.cpu=2 \
>>>   -Dtaskmanager.numberOfTaskSlots=8 \
>>>   -Dresourcemanager.taskmanager-timeout=3600000 \
>>>   -Dkubernetes.container.image=
>>> docker.example.nl/flink:1.10.0-2.12-s3-presto
>>>
>>>
>>> I then submit this into Kubernetes with this command
>>>
>>> flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100
>>> target/flink-table-esloader-0.1-SNAPSHOT.jar
>>>
>>>
>>> The job starts and after about 40 seconds the job fails with this
>>> exception:
>>>
>>> *Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>>> from service endpoint*
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
>>> at
>>> com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
>>> at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
>>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>>> at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>>> at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
>>> at
>>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
>>> at
>>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>> *Caused by: java.net.SocketException: Network is unreachable (connect
>>> failed)*
>>> at java.net.PlainSocketImpl.socketConnect(Native Method)
>>> at
>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>> at
>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>> at
>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>> at java.net.Socket.connect(Socket.java:607)
>>> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
>>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
>>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
>>> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
>>> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
>>> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
>>> at
>>> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
>>> at
>>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
>>> at
>>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
>>> at
>>> com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
>>> ... 28 more
>>>
>>>
>>> I have tried this with
>>>
>>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>>
>>> and also by using the ClusterIP and the LoadBalancer IP and I get the
>>> same error in all cases.
>>>
>>> I have verified by logging in into the task manager pod that all of
>>> these endpoints show a sensible result when simply doing a curl from the
>>> commandline.
>>>
>>> I have the s3cmd installed locally on my laptop.
>>> My ~/.s3cfg looks like this and I can fully access this S3 setup.
>>>
>>>
>>> [default]
>>> access_key = myAccessKey
>>> secret_key = mySecretKey
>>> host_base = s3.example.nl
>>>
>>>
>>> *I'm stuck, please help:*
>>>
>>>    - What is causing the differences in behaviour between local and in
>>>    k8s? It works locally but not in the cluster.
>>>    - How do I figure out what network it is trying to reach in k8s?
>>>
>>>
>>> Thanks.
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

As I mentioned in my original email I already verified that the endpoints
were accessible from the pods, that was not the problem.

It took me a while but I've figured out what went wrong.

Setting the configuration like I did

final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",
"s3.example.nl");conf.setString("presto.s3.access-key",
"myAccessKey");conf.setString("presto.s3.secret-key",
"mySecretKey");FileSystem.initialize(conf, null);

sets it in some static variables that do not get serialized and shipped
into the task managers.

As a consequence, under the absence of credentials the AWS/S3 client
assumes it is running inside AWS and that it can retrieve the credentials
from http://169.254.170.2  (which is non routable)
Because this is not AWS it cannot do this and I get the error it cannot
connect.

For now my solution is to start the Flink Session with this
#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto
\
  -Dpresto.s3.endpoint=s3.example.nl \
  -Dpresto.s3.access-key=MyAccessKey \
  -Dpresto.s3.secret-key=MySecretKey \
  -Dpresto.s3.path.style.access=true

I dislike this because now ALL jobs in this Flink cluster have the same
credentials.

Is there a way to set the S3 credentials on a per job or even per
connection basis?

Niels Basjes


On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <da...@gmail.com> wrote:

> Hi Niels,
>
> Glad to hear that you are trying Flink native K8s integration and share
> you feedback.
>
> What is causing the differences in behavior between local and in k8s? It
>> works locally but not in the cluster.
>
>
> In your case, the job could be executed successfully local. That means S3
> endpoint could be accessed in
> your local network environment. When you submit the job to the K8s
> cluster, the user `main()` will be executed
> locally and get the job graph. Then it will be submitted to the cluster
> for the execution. S3 endpoint will be
> accessed under the K8s network. So maybe there is something wrong with the
> network between taskmanager
> and S3 endpoint.
>
> How do I figure out what network it is trying to reach in k8s?
>
>
> I am not an expert of S3. So i am not sure whether the SDK will fetch the
> credentials from S3 endpoint. If it is,
> i think you need to find out which taskmanager the source operator is
> running on. Then exec into the Pod and
> use nslookup/curl to make sure the endpoint "s3.example.nl" could be
> resolved and accessed successfully.
>
>
>
> Best,
> Yang
>
>
> Niels Basjes <Ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道:
>
>> Hi,
>>
>> I have a problem with accessing my own S3 system from within Flink when
>> running on Kubernetes.
>>
>> *TL;DR* I have my own S3 (Ceph), Locally my application works, when
>> running in K8s it fails with
>>
>> Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>> from service endpoint
>> Caused by: java.net.SocketException: Network is unreachable (connect
>> failed)
>>
>>
>> I have my own Kubernetes cluster (1.17) on which I have install Ceph and
>> the S3 gateway that is included in there.
>> I have put a file on this 'S3' and in my Flink 1.10.0 application I do
>> this:
>>
>> StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> final Configuration conf = new Configuration();
>>
>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>
>> conf.setString("presto.s3.access-key",       "myAccessKey");
>>
>> conf.setString("presto.s3.secret-key",       "mySecretKey");
>>
>> FileSystem.initialize(conf, null);
>>
>> senv.setParallelism(2);
>>
>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> DataStream<String> rawInputStream = senv
>>
>>     .readTextFile(path).name("Read input");
>>
>> ...
>>
>>
>> The s3.example.nl is the hostname of the ingress I have attached to the
>> S3 endpoint. In my case it is accessible via both http and https (with a
>> valid LetsEncrypt certificate).
>>
>> When I run this locally from within IntelliJ it works like a charm, reads
>> the data, does some stuff with it and then writes it to ElasticSearch.
>>
>> I have created an additional layer to enable the fs-s3-presto plugin with
>> this Dockerfile.
>>
>>
>> FROM flink:1.10.0-scala_2.12
>> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
>> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto
>>
>>
>> I run flink with this customized docker image like this
>>
>>
>> #!/bin/bash
>> ./flink-1.10.0/bin/kubernetes-session.sh \
>>   -Dkubernetes.cluster-id=flink1100 \
>>   -Dtaskmanager.memory.process.size=8192m \
>>   -Dkubernetes.taskmanager.cpu=2 \
>>   -Dtaskmanager.numberOfTaskSlots=8 \
>>   -Dresourcemanager.taskmanager-timeout=3600000 \
>>   -Dkubernetes.container.image=
>> docker.example.nl/flink:1.10.0-2.12-s3-presto
>>
>>
>> I then submit this into Kubernetes with this command
>>
>> flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100
>> target/flink-table-esloader-0.1-SNAPSHOT.jar
>>
>>
>> The job starts and after about 40 seconds the job fails with this
>> exception:
>>
>> *Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>> from service endpoint*
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
>> at
>> com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
>> at
>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>> *Caused by: java.net.SocketException: Network is unreachable (connect
>> failed)*
>> at java.net.PlainSocketImpl.socketConnect(Native Method)
>> at
>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>> at
>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>> at
>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>> at java.net.Socket.connect(Socket.java:607)
>> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
>> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
>> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
>> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
>> at
>> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
>> at
>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
>> at
>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
>> at
>> com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
>> ... 28 more
>>
>>
>> I have tried this with
>>
>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>
>> and also by using the ClusterIP and the LoadBalancer IP and I get the
>> same error in all cases.
>>
>> I have verified by logging in into the task manager pod that all of these
>> endpoints show a sensible result when simply doing a curl from the
>> commandline.
>>
>> I have the s3cmd installed locally on my laptop.
>> My ~/.s3cfg looks like this and I can fully access this S3 setup.
>>
>>
>> [default]
>> access_key = myAccessKey
>> secret_key = mySecretKey
>> host_base = s3.example.nl
>>
>>
>> *I'm stuck, please help:*
>>
>>    - What is causing the differences in behaviour between local and in
>>    k8s? It works locally but not in the cluster.
>>    - How do I figure out what network it is trying to reach in k8s?
>>
>>
>> Thanks.
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Posted by Yang Wang <da...@gmail.com>.
Hi Niels,

Glad to hear that you are trying Flink native K8s integration and share you
feedback.

What is causing the differences in behavior between local and in k8s? It
> works locally but not in the cluster.


In your case, the job could be executed successfully local. That means S3
endpoint could be accessed in
your local network environment. When you submit the job to the K8s cluster,
the user `main()` will be executed
locally and get the job graph. Then it will be submitted to the cluster for
the execution. S3 endpoint will be
accessed under the K8s network. So maybe there is something wrong with the
network between taskmanager
and S3 endpoint.

How do I figure out what network it is trying to reach in k8s?


I am not an expert of S3. So i am not sure whether the SDK will fetch the
credentials from S3 endpoint. If it is,
i think you need to find out which taskmanager the source operator is
running on. Then exec into the Pod and
use nslookup/curl to make sure the endpoint "s3.example.nl" could be
resolved and accessed successfully.



Best,
Yang


Niels Basjes <Ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道:

> Hi,
>
> I have a problem with accessing my own S3 system from within Flink when
> running on Kubernetes.
>
> *TL;DR* I have my own S3 (Ceph), Locally my application works, when
> running in K8s it fails with
>
> Caused by: com.amazonaws.SdkClientException: Unable to load credentials
> from service endpoint
> Caused by: java.net.SocketException: Network is unreachable (connect
> failed)
>
>
> I have my own Kubernetes cluster (1.17) on which I have install Ceph and
> the S3 gateway that is included in there.
> I have put a file on this 'S3' and in my Flink 1.10.0 application I do
> this:
>
> StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
>
> final Configuration conf = new Configuration();
>
> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>
> conf.setString("presto.s3.access-key",       "myAccessKey");
>
> conf.setString("presto.s3.secret-key",       "mySecretKey");
>
> FileSystem.initialize(conf, null);
>
> senv.setParallelism(2);
>
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream<String> rawInputStream = senv
>
>     .readTextFile(path).name("Read input");
>
> ...
>
>
> The s3.example.nl is the hostname of the ingress I have attached to the
> S3 endpoint. In my case it is accessible via both http and https (with a
> valid LetsEncrypt certificate).
>
> When I run this locally from within IntelliJ it works like a charm, reads
> the data, does some stuff with it and then writes it to ElasticSearch.
>
> I have created an additional layer to enable the fs-s3-presto plugin with
> this Dockerfile.
>
>
> FROM flink:1.10.0-scala_2.12
> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto
>
>
> I run flink with this customized docker image like this
>
>
> #!/bin/bash
> ./flink-1.10.0/bin/kubernetes-session.sh \
>   -Dkubernetes.cluster-id=flink1100 \
>   -Dtaskmanager.memory.process.size=8192m \
>   -Dkubernetes.taskmanager.cpu=2 \
>   -Dtaskmanager.numberOfTaskSlots=8 \
>   -Dresourcemanager.taskmanager-timeout=3600000 \
>   -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-pres
> to
>
>
> I then submit this into Kubernetes with this command
>
> flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100
> target/flink-table-esloader-0.1-SNAPSHOT.jar
>
>
> The job starts and after about 40 seconds the job fails with this
> exception:
>
> *Caused by: com.amazonaws.SdkClientException: Unable to load credentials
> from service endpoint*
> at
> com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
> at
> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
> at
> com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
> at
> com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at
> com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
> at
> com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> *Caused by: java.net.SocketException: Network is unreachable (connect
> failed)*
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:607)
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
> at
> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
> at
> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
> at
> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
> at
> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
> at
> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
> at
> com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
> at
> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
> ... 28 more
>
>
> I have tried this with
>
> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>
> and also by using the ClusterIP and the LoadBalancer IP and I get the same
> error in all cases.
>
> I have verified by logging in into the task manager pod that all of these
> endpoints show a sensible result when simply doing a curl from the
> commandline.
>
> I have the s3cmd installed locally on my laptop.
> My ~/.s3cfg looks like this and I can fully access this S3 setup.
>
>
> [default]
> access_key = myAccessKey
> secret_key = mySecretKey
> host_base = s3.example.nl
>
>
> *I'm stuck, please help:*
>
>    - What is causing the differences in behaviour between local and in
>    k8s? It works locally but not in the cluster.
>    - How do I figure out what network it is trying to reach in k8s?
>
>
> Thanks.
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>