You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yang Wang <da...@gmail.com> on 2020/03/02 01:39:38 UTC

Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

Hi Niels,

You are right. The S3 related configurations you have set in your `main()`
is only
applicable in the client side. Since the filesystem is initialized in the
entrypoint of
JM/TM for only once. AFAIK, we could not provide different credentials for
each
job in the same session cluster.

Best,
Yang

Niels Basjes <Ni...@basjes.nl> 于2020年2月28日周五 下午11:09写道:

> Hi,
>
> As I mentioned in my original email I already verified that the endpoints
> were accessible from the pods, that was not the problem.
>
> It took me a while but I've figured out what went wrong.
>
> Setting the configuration like I did
>
> final Configuration conf = new Configuration();
> conf.setString("presto.s3.endpoint",         "s3.example.nl");conf.setString("presto.s3.access-key",       "myAccessKey");conf.setString("presto.s3.secret-key",       "mySecretKey");FileSystem.initialize(conf, null);
>
> sets it in some static variables that do not get serialized and shipped
> into the task managers.
>
> As a consequence, under the absence of credentials the AWS/S3 client
> assumes it is running inside AWS and that it can retrieve the credentials
> from http://169.254.170.2  (which is non routable)
> Because this is not AWS it cannot do this and I get the error it cannot
> connect.
>
> For now my solution is to start the Flink Session with this
> #!/bin/bash
> ./flink-1.10.0/bin/kubernetes-session.sh \
>   -Dkubernetes.cluster-id=flink1100 \
>   -Dtaskmanager.memory.process.size=8192m \
>   -Dkubernetes.taskmanager.cpu=2 \
>   -Dtaskmanager.numberOfTaskSlots=4 \
>   -Dresourcemanager.taskmanager-timeout=3600000 \
>   -Dkubernetes.container.image=
> docker.example.nl/flink:1.10.0-2.12-s3-presto \
>   -Dpresto.s3.endpoint=s3.example.nl \
>   -Dpresto.s3.access-key=MyAccessKey \
>   -Dpresto.s3.secret-key=MySecretKey \
>   -Dpresto.s3.path.style.access=true
>
> I dislike this because now ALL jobs in this Flink cluster have the same
> credentials.
>
> Is there a way to set the S3 credentials on a per job or even per
> connection basis?
>
> Niels Basjes
>
>
> On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi Niels,
>>
>> Glad to hear that you are trying Flink native K8s integration and share
>> you feedback.
>>
>> What is causing the differences in behavior between local and in k8s? It
>>> works locally but not in the cluster.
>>
>>
>> In your case, the job could be executed successfully local. That means S3
>> endpoint could be accessed in
>> your local network environment. When you submit the job to the K8s
>> cluster, the user `main()` will be executed
>> locally and get the job graph. Then it will be submitted to the cluster
>> for the execution. S3 endpoint will be
>> accessed under the K8s network. So maybe there is something wrong with
>> the network between taskmanager
>> and S3 endpoint.
>>
>> How do I figure out what network it is trying to reach in k8s?
>>
>>
>> I am not an expert of S3. So i am not sure whether the SDK will fetch the
>> credentials from S3 endpoint. If it is,
>> i think you need to find out which taskmanager the source operator is
>> running on. Then exec into the Pod and
>> use nslookup/curl to make sure the endpoint "s3.example.nl" could be
>> resolved and accessed successfully.
>>
>>
>>
>> Best,
>> Yang
>>
>>
>> Niels Basjes <Ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道:
>>
>>> Hi,
>>>
>>> I have a problem with accessing my own S3 system from within Flink when
>>> running on Kubernetes.
>>>
>>> *TL;DR* I have my own S3 (Ceph), Locally my application works, when
>>> running in K8s it fails with
>>>
>>> Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>>> from service endpoint
>>> Caused by: java.net.SocketException: Network is unreachable (connect
>>> failed)
>>>
>>>
>>> I have my own Kubernetes cluster (1.17) on which I have install Ceph and
>>> the S3 gateway that is included in there.
>>> I have put a file on this 'S3' and in my Flink 1.10.0 application I do
>>> this:
>>>
>>> StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> final Configuration conf = new Configuration();
>>>
>>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>>
>>> conf.setString("presto.s3.access-key",       "myAccessKey");
>>>
>>> conf.setString("presto.s3.secret-key",       "mySecretKey");
>>>
>>> FileSystem.initialize(conf, null);
>>>
>>> senv.setParallelism(2);
>>>
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> DataStream<String> rawInputStream = senv
>>>
>>>     .readTextFile(path).name("Read input");
>>>
>>> ...
>>>
>>>
>>> The s3.example.nl is the hostname of the ingress I have attached to the
>>> S3 endpoint. In my case it is accessible via both http and https (with a
>>> valid LetsEncrypt certificate).
>>>
>>> When I run this locally from within IntelliJ it works like a charm,
>>> reads the data, does some stuff with it and then writes it to ElasticSearch.
>>>
>>> I have created an additional layer to enable the fs-s3-presto plugin
>>> with this Dockerfile.
>>>
>>>
>>> FROM flink:1.10.0-scala_2.12
>>> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
>>> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto
>>>
>>>
>>> I run flink with this customized docker image like this
>>>
>>>
>>> #!/bin/bash
>>> ./flink-1.10.0/bin/kubernetes-session.sh \
>>>   -Dkubernetes.cluster-id=flink1100 \
>>>   -Dtaskmanager.memory.process.size=8192m \
>>>   -Dkubernetes.taskmanager.cpu=2 \
>>>   -Dtaskmanager.numberOfTaskSlots=8 \
>>>   -Dresourcemanager.taskmanager-timeout=3600000 \
>>>   -Dkubernetes.container.image=
>>> docker.example.nl/flink:1.10.0-2.12-s3-presto
>>>
>>>
>>> I then submit this into Kubernetes with this command
>>>
>>> flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100
>>> target/flink-table-esloader-0.1-SNAPSHOT.jar
>>>
>>>
>>> The job starts and after about 40 seconds the job fails with this
>>> exception:
>>>
>>> *Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>>> from service endpoint*
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
>>> at
>>> com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>> at
>>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
>>> at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
>>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>>> at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>>> at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
>>> at
>>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
>>> at
>>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>> *Caused by: java.net.SocketException: Network is unreachable (connect
>>> failed)*
>>> at java.net.PlainSocketImpl.socketConnect(Native Method)
>>> at
>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>> at
>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>> at
>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>> at java.net.Socket.connect(Socket.java:607)
>>> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
>>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
>>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
>>> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
>>> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
>>> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
>>> at
>>> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
>>> at
>>> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
>>> at
>>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
>>> at
>>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
>>> at
>>> com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
>>> at
>>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
>>> ... 28 more
>>>
>>>
>>> I have tried this with
>>>
>>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>>
>>> and also by using the ClusterIP and the LoadBalancer IP and I get the
>>> same error in all cases.
>>>
>>> I have verified by logging in into the task manager pod that all of
>>> these endpoints show a sensible result when simply doing a curl from the
>>> commandline.
>>>
>>> I have the s3cmd installed locally on my laptop.
>>> My ~/.s3cfg looks like this and I can fully access this S3 setup.
>>>
>>>
>>> [default]
>>> access_key = myAccessKey
>>> secret_key = mySecretKey
>>> host_base = s3.example.nl
>>>
>>>
>>> *I'm stuck, please help:*
>>>
>>>    - What is causing the differences in behaviour between local and in
>>>    k8s? It works locally but not in the cluster.
>>>    - How do I figure out what network it is trying to reach in k8s?
>>>
>>>
>>> Thanks.
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>