You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by " Mario Georgiev (JIRA)" <ji...@apache.org> on 2019/01/25 09:43:00 UTC

[jira] [Commented] (FLINK-11429) Flink fails to authenticate s3a with core-site.xml

    [ https://issues.apache.org/jira/browse/FLINK-11429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16752090#comment-16752090 ] 

 Mario Georgiev commented on FLINK-11429:
-----------------------------------------

Tried using flink's hadoop2.8 binary and providing the appropriate lib dependencies for hadoop, but still to no avail. 

Putting the S3 secret key and access key in ENV variables works, while core-site is completely ignored no matter where i put it. 

> Flink fails to authenticate s3a with core-site.xml
> --------------------------------------------------
>
>                 Key: FLINK-11429
>                 URL: https://issues.apache.org/jira/browse/FLINK-11429
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.7.1
>            Reporter:  Mario Georgiev
>            Priority: Critical
>
> Hello,
> Problem is, if i put the core-site.xml somewhere and add it in the flink image, put the path to it in the flink-conf.yaml it does not get picked and i get an exception 
> {code:java}
> Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
> at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
> at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
> at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 31 more
> Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
> at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
> ... 48 more
> Caused by: java.net.SocketException: Network unreachable (connect failed)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:589)
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
> at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
> at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
> at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
> at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
> at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
> {code}
> However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the Dockerfile, they get picked and it works. Why is it disregarding the core-site.xml? Even if i don't copy the core-site.xml it works only with the ENV variables.
> {code:java}
> <configuration>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>     </property>
>     <!-- Comma separated list of local directories used to buffer
>          large results prior to transmitting them to S3. -->
>     <property>
>         <name>fs.s3a.buffer.dir</name>
>         <value>/tmp</value>
>     </property>
>     <property>
>         <name>fs.s3a.access.key</name>
>         <description>AWS access key ID.
>             Omit for IAM role-based or provider-based authentication.</description>
>         <value><hidden></value>
>     </property>
>     <property>
>         <name>fs.s3a.secret.key</name>
>         <description>AWS secret key.
>             Omit for IAM role-based or provider-based authentication.</description>
>         <value><hidden></value>
>     </property>
>     <property>
>         <name>fs.s3a.aws.credentials.provider</name>
>         <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
>     </property>
> </configuration>
> {code}
> I am building the kubernetes standalone image as following :
> Dockerfile : 
> {code:java}
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> FROM openjdk:8-jre-alpine
> # Install requirements
> # Modification to original Dockerfile to support rocksdb
> # RUN apk add --no-cache bash snappy
> # This is a fix for RocksDB compatibility
> # Flink environment variables
> ENV FLINK_INSTALL_PATH=/opt
> ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
> ENV FLINK_LIB_DIR $FLINK_HOME/lib
> ENV PATH $PATH:$FLINK_HOME/bin
> ENV FLINK_CONF $FLINK_HOME/conf
> # flink-dist can point to a directory or a tarball on the local system
> ARG flink_dist=NOT_SET
> ARG job_jar=NOT_SET
> # Install build dependencies and flink
> ADD $flink_dist $FLINK_INSTALL_PATH
> ADD $job_jar $FLINK_INSTALL_PATH/job.jar
> RUN set -x && \
>   ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
>   ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
>   addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
>   chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
>   chown -h flink:flink $FLINK_HOME
> # Modification to original Dockerfile
> RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
> COPY core-site.xml /etc/hadoop/conf/core-site.xml
> RUN echo "fs.hdfs.hadoopconf: /etc/hadoop/conf" >> $FLINK_CONF/flink-conf.yaml
> COPY docker-entrypoint.sh /
> RUN chmod +x docker-entrypoint.sh
> RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.7.3.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.11.183.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.183/aws-java-sdk-s3-1.11.183.jar
> RUN wget -O $FLINK_LIB_DIR/flink-s3-fs-hadoop-1.7.1.jar http://central.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.7.1/flink-s3-fs-hadoop-1.7.1.jar
> #Transitive Dependency of aws-java-sdk-s3
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.183.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.183/aws-java-sdk-core-1.11.183.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.183.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.183/aws-java-sdk-kms-1.11.183.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.6.7.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.6.7/jackson-annotations-2.6.7.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-core-2.6.7.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.6.7/jackson-core-2.6.7.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.6.7.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.6.7/jackson-databind-2.6.7.jar
> RUN wget -O $FLINK_LIB_DIR/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
> RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.4.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar
> RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.3/httpclient-4.5.3.jar
> #Modification to original Dockerfile
> USER flink
> EXPOSE 8081 6123
> ENTRYPOINT ["/docker-entrypoint.sh"]
> CMD ["--help"]
> {code}
>  
>  
>  
> {code:java}
> import org.apache.commons.lang3.RandomStringUtils;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Random;
> public class WordCount {
>     public static void main (String[] args) throws Exception{
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000L);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>   env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         String words1[] = new String[]{
>                 "football",
>                 "soccer",
>                 "billiards",
>                 "snooker",
>                 "tennis",
>                 "handball",
>                 "basketball"
>         };
>         List<String> words = new ArrayList<>();
>         Random rnd = new Random();
>         for (int i =0 ; i < 500000;i++) {
>             words.add(words1[rnd.nextInt(words1.length-1)]);
>         }
>         DataStreamSource<String> src = env.fromElements(words.toArray(new String[]{}));
>         src.map(str -> str.toLowerCase())
>                 .flatMap(new Splitter())
>                 .returns(TypeInformation.of(new TypeHint<Tuple2<String,Integer>>(){}))
>                 .keyBy(0)
>                 .sum(1)
>                 .print();
>         env.execute();
>     }
>     public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
>             for (String word : sentence.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
> }
> {code}
> Job manger kubernetes args :
>  It is a template, so disregard the placeholders
> {code:java}
>        "job-cluster",
>        "--job-classname", "{classname}",
>        "-Djobmanager.rpc.address={cluster.name}-jobmanager",
>        "-Dparallelism.default=2",
>        "-Dblob.server.port=6124",
>        "-Dqueryable-state.server.ports=6125",
>        "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
>        "-Dstate.backend=rocksdb",
>        "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
>        "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
>        "-Dstate.backend.incremental=true"
> {code}
> Task manager kubernetes args: 
>  Again, templated
> {code:java}
>        ["task-manager",
>        "-Djobmanager.rpc.address={cluster.name}-jobmanager",
>        "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
>        "-Dstate.backend=rocksdb",
>        "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
>        "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
>        "-Dstate.backend.incremental=true"]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)