You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Samra Kasim <sa...@thehumangeo.com> on 2017/01/10 20:16:55 UTC

Reading and Writing to S3

Hi,

I am new to Flink and I've written two small test projects: 1) to read data
from s3 and 2) to push data to s3. However, I am getting two different
errors for the projects relating to, i think, how the core-site.xml file is
being read. I am running the project locally in IntelliJ. I have the
environment variable in run configurations set to
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
core-site.xml in the src/main/resources folder but get the same errors. I
want to know if my core-site.xml file is configured correctly for using s3a
and how to have IntelliJ read the core-site.xml file? Also, are the
core-site.xml configurations different for reading versus writing to s3?

This is my code for reading data from s3:

public class DesktopWriter {



    public static void main(String[] args) throws Exception {



        ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment();

        DataSet<String> data =
env.readTextFile("s3://flink-test/flink-test.txt");

        data.print();

    }

}

I get the error: Caused by: java.io.IOException: Cannot determine access
key to Amazon S3. Please make sure to configure it by setting the
configuration key 'fs.s3.accessKey'.

This is my code for writing to S3:

public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.*fromMap*(configs)
;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.
*getExecutionEnvironment*();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setGlobalJobParameters(parameterTool);

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired("kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()));

        messageStream.writeAsText("s3a://flink-test/flinktest.txt"
).setParallelism(1);

        env.execute();
    }

I get the error: Caused by: java.io.IOException: The given file URI
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test,
but the File System could not be initialized with that address: Unable to
load AWS credentials from any provider in the chain

This is my core-site.xml:

<configuration>

    <property>

        <name>fs.defaultFS</name>

        <value>hdfs://localhost:9000</value>

    </property>

    <property>

        <name>fs.s3.impl</name>

        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>

    </property>


    <!-- Comma separated list of local directories used to buffer

         large results prior to transmitting them to S3. -->

    <property>

        <name>fs.s3a.buffer.dir</name>

        <value>/tmp</value>

    </property>


    <!-- set your AWS ID using key defined in
org.apache.hadoop.fs.s3a.Constants
-->

    <property>

        <name>fs.s3a.awsAccessKeyId</name>

        <value>*****</value>

    </property>


    <!-- set your AWS access key -->

    <property>

        <name>fs.s3a.awsSecretAccessKey</name>

        <value>*****</value>

    </property>

</configuration>
This is my pom.xml:

<dependencies>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-java</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-streaming-java_2.10</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-clients_2.10</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>

        <version>1.1.4</version>

    </dependency>



    <dependency>

        <groupId>com.amazonaws</groupId>

        <artifactId>aws-java-sdk</artifactId>

        <version>1.7.4</version>

    </dependency>



    <dependency>

        <groupId>org.apache.hadoop</groupId>

        <artifactId>hadoop-aws</artifactId>

        <version>2.7.2</version>

    </dependency>



    <dependency>

        <groupId>org.apache.httpcomponents</groupId>

        <artifactId>httpclient</artifactId>

        <version>4.2.5</version>

    </dependency>

    <dependency>

        <groupId>org.apache.httpcomponents</groupId>

        <artifactId>httpcore</artifactId>

        <version>4.2.5</version>

    </dependency>

</dependencies>

Thanks!
Sam

Re: Reading and Writing to S3

Posted by "M. Dale" <me...@yahoo.com>.
Samra,   As I was quickly looking at your code I only saw the ExecutionEnvironment from the read and not the StreamingExecutionEnvironment for the write. Glad to hear that this worked for batching.  Like you, I am very much a Flink beginner who just happened to have tried out the batch write to S3. I have played around with the streaming examples but no output to S3. S3 is a key/object store. There is no append only replace. So it seems you would need some sort of window function to write a specific amount of events for a time period, which then creates a new S3 object for every window. But I am just speculating and hopefully someone else can shed more light on this.
Best of luck,Markus 

    On Wednesday, January 11, 2017 2:53 PM, Samra Kasim <sa...@thehumangeo.com> wrote:
 

 Hi Markus,
Thanks! This was very helpful! I realize what the issue is now. I followed what you did and I am able to write data to s3 if I do batch processing, but not stream processing. Do you know what the difference is and why it would work for one and not the other?
Sam
On Wed, Jan 11, 2017 at 12:40 PM, M. Dale <me...@yahoo.com> wrote:

Sam,   Don't point the variables at files, point them at the directories containing the files. Do you have fs.s3.impl property defined?
Concrete example:
/home/markus/hadoop-config directory has one file "core-site.xml" with thefollowing content:
<configuration>    <property>        <name>fs.s3.impl</name>        <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large results prior to transmitting them to S3. -->    <property>        <name>fs.s3a.buffer.dir</name>        <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. Constants -->    <property>        <name>fs.s3a.access.key</name>        <value>YOUR_ACCESS_KEY</value>    </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a.secret.key</name>        <value>YOUR_SECRET_KEY</value>    </property></configuration>
/home/markus/flink-config directory has one file "flink-conf.yaml" with the following content point hadoopconf to the DIRECTORY containing core-site.xml:
fs.hdfs.hadoopconf: /home/markus/hadoop-config
In IntelliJ, go to Run - Edit Configurations - <your run configuration> andset the FLINK_CONF_DIR environment variable to point to the directory containingflink-conf.yaml (i.e in my case /home/markus/flink-config). So everything is pointing to directories where the code looks for well-known filenames.
With that, the following works to write to S3. (Maybe load events from collection at first):
events.writeAsText("s3://< bucket>/<prefix-dir>")

env.execute 

    On Wednesday, January 11, 2017 10:44 AM, Samra Kasim <sa...@thehumangeo.com> wrote:
 

 Hi Markus,
Thanks for your help. I created an environment variable in IntelliJ for FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get the error: java.io.IOException: No file system found with scheme s3, referenced in file URI 's3://flink-test/ flinkoutputtest.txt'.
I have been able to get it to work by using the environment variable HADOOP_HOME to point directly to the core-site.xml, but when I do that and I push data from Kafka, I can see the message stream printed to my terminal, but no file gets saved to s3. I also don't see any errors. I have the correct AWS access id and key because i am able to read from files on s3 using Flink.
My code is below:    public static voidmain(String[] args) throws Exception {        Map<String,String> configs = ConfigUtils.loadConfigs("/ path/to/src/main/resources/ error-queue.yaml");         finalParameterTool parameterTool = ParameterTool.fromMap(configs) ;        StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment();       env.getConfig(). disableSysoutLogging();       env.getConfig(). setGlobalJobParameters( parameterTool)        DataStream<String> messageStream = env               .addSource(new FlinkKafkaConsumer09<String>(                       parameterTool.getRequired(" kafka.topic"),                       new SimpleStringSchema(),                       parameterTool.getProperties()) );        messageStream.print();        messageStream.writeAsText("s3: //flink-test/flinkoutputtest. txt").setParallelism(1);         env.execute();
On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <me...@yahoo.com> wrote:

Sam,  I just happened to answer a similar question on Stackoverflow at Does Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a PR to make that (for me) a little clearer on the Apache Flink documentation (https://github.com/apache/fli nk/pull/3054/files).  
|  
|  
|  
|   |    |

  |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project that produces a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

    On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <sa...@thehumangeo.com> wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data from s3 and 2) to push data to s3. However, I am getting two different errors for the projects relating to, i think, how the core-site.xml file is being read. I am running the project locally in IntelliJ. I have the environment variable in run configurations set to HADOOP_HOME=path/to/dir-with-c ore-site.xml. I have also tried saving the core-site.xml in the src/main/resources folder but get the same errors. I want to know if my core-site.xml file is configured correctly for using s3a and how to have IntelliJ read the core-site.xml file? Also, are the core-site.xml configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter {     public static voidmain(String[] args) throws Exception {        ExecutionEnvironment env =ExecutionEnvironment.createLoc alEnvironment();       DataSet<String> data = env.readTextFile("s3://flink-t est/flink-test.txt");        data.print();    }}I get the error: Caused by: java.io.IOException: Cannot determine access key to Amazon S3. Please make sure to configure it by setting the configuration key 'fs.s3.accessKey'.This is my code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.loadConfigs(“path/ to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" s3a://flink-test/flinktest.txt ").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI (s3://flink-test/flinktest.txt ) points to the HDFS NameNode at flink-test, but the File System could not be initialized with that address: Unable to load AWS credentials from any provider in the chain
This is my core-site.xml:
<configuration>    <property>        <name>fs.defaultFS</name>        <value>hdfs://localhost:9000</ value>    </property>    <property>        <name>fs.s3.impl</name>        <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large results prior to transmitting them to S3. -->    <property>        <name>fs.s3a.buffer.dir</name>        <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. Constants -->    <property>        <name>fs.s3a.awsAccessKeyId</ name>        <value>*****</value>    </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a. awsSecretAccessKey</name>        <value>*****</value>    </property></configuration>This is my pom.xml:<dependencies>    <dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-java</artifa ctId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</gro upId>        <artifactId>flink-streaming-ja va_2.10</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-clients_2.10 </artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-connector-ka fka-0.9_2.10</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>com.amazonaws</groupI d>        <artifactId>aws-java-sdk </artifactId>       <version>1.7.4</version>   </dependency>     <dependency>       <groupId>org.apache.hadoop</gr oupId>       <artifactId>hadoop-aws</artifa ctId>       <version>2.7.2</version>   </dependency>     <dependency>       <groupId>org.apache.httpcompon ents</groupId>       <artifactId>httpclient</artifa ctId>       <version>4.2.5</version>   </dependency>    <dependency>       <groupId>org.apache.httpcompon ents</groupId>       <artifactId>httpcore</artifact Id>       <version>4.2.5</version>   </dependency></dependencies>
Thanks!Sam

   




   



-- 

Samra KasimTechnologist
HUMANgEOVirginia Office
4350 N Fairfax Drive
Suite 950
Arlington, VA 22203E-Mail:  samra.kasim@thehumangeo.com 
Web:    http://www.thehumangeo.com/

   

Re: Reading and Writing to S3

Posted by Samra Kasim <sa...@thehumangeo.com>.
Hi Markus,

Thanks! This was very helpful! I realize what the issue is now. I followed
what you did and I am able to write data to s3 if I do batch processing,
but not stream processing. Do you know what the difference is and why it
would work for one and not the other?

Sam

On Wed, Jan 11, 2017 at 12:40 PM, M. Dale <me...@yahoo.com> wrote:

> Sam,
>    Don't point the variables at files, point them at the directories
> containing the files. Do you have fs.s3.impl property defined?
>
> Concrete example:
>
> /home/markus/hadoop-config directory has one file "core-site.xml" with the
> following content:
>
> <configuration>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>     </property>
>
>     <!-- Comma separated list of local directories used to buffer
>          large results prior to transmitting them to S3. -->
>     <property>
>         <name>fs.s3a.buffer.dir</name>
>         <value>/tmp</value>
>     </property>
>
>     <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants
> -->
>     <property>
>         <name>fs.s3a.access.key</name>
>         <value>YOUR_ACCESS_KEY</value>
>     </property>
>
>     <!-- set your AWS access key -->
>     <property>
>         <name>fs.s3a.secret.key</name>
>         <value>YOUR_SECRET_KEY</value>
>     </property>
> </configuration>
>
> /home/markus/flink-config directory has one file "flink-conf.yaml" with
> the
> following content point hadoopconf to the DIRECTORY containing
> core-site.xml:
>
> fs.hdfs.hadoopconf: /home/markus/hadoop-config
>
> In IntelliJ, go to Run - Edit Configurations - <your run configuration> and
> set the FLINK_CONF_DIR environment variable to point to the directory
> containing
> flink-conf.yaml (i.e in my case /home/markus/flink-config). So everything
> is
> pointing to directories where the code looks for well-known filenames.
>
> With that, the following works to write to S3. (Maybe load events from
> collection at first):
>
> events.writeAsText("s3://<bucket>/<prefix-dir>")
>
> env.execute
>
>
>
> On Wednesday, January 11, 2017 10:44 AM, Samra Kasim <
> samra.kasim@thehumangeo.com> wrote:
>
>
> Hi Markus,
>
> Thanks for your help. I created an environment variable in IntelliJ for
> FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined
> fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get
> the error: java.io.IOException: No file system found with scheme s3,
> referenced in file URI 's3://flink-test/ flinkoutputtest.txt'.
>
> I have been able to get it to work by using the environment variable
> HADOOP_HOME to point directly to the core-site.xml, but when I do that and
> I push data from Kafka, I can see the message stream printed to my
> terminal, but no file gets saved to s3. I also don't see any errors. I have
> the correct AWS access id and key because i am able to read from files on
> s3 using Flink.
>
> My code is below:
>     public static void main(String[] args) throws Exception {
>         Map<String, String> configs = ConfigUtils.loadConfigs("/
> path/to/src/main/resources/ error-queue.yaml");
>
>         final ParameterTool parameterTool = ParameterTool.fromMap(configs)
> ;
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>         env.getConfig(). disableSysoutLogging();
>         env.getConfig(). setGlobalJobParameters( parameterTool)
>
>         DataStream<String> messageStream = env
>                 .addSource(new FlinkKafkaConsumer09<String>(
>                         parameterTool.getRequired(" kafka.topic"),
>                         new SimpleStringSchema(),
>                         parameterTool.getProperties()) );
>
>         messageStream.print();
>         messageStream.writeAsText("s3: //flink-test/flinkoutputtest.
> txt").setParallelism(1);
>
>         env.execute();
>
> On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <me...@yahoo.com> wrote:
>
> Sam,
>   I just happened to answer a similar question on Stackoverflow at Does
> Apache Flink AWS S3 Sink require Hadoop for local testing?
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>.
> I also submitted a PR to make that (for me) a little clearer on the Apache
> Flink documentation (https://github.com/apache/fli nk/pull/3054/files
> <https://github.com/apache/flink/pull/3054/files>).
> Does Apache Flink AWS S3 Sink require Hadoop for local testing?
> I am relatively new to Apache Flink and I am trying to create a simple
> project that produces a file to an AWS S3...
>
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>
>
> Let me know if that works for you.
>
> Thanks,
> Markus
>
>
> On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <
> samra.kasim@thehumangeo.com> wrote:
>
>
> Hi,
>
> I am new to Flink and I've written two small test projects: 1) to read
> data from s3 and 2) to push data to s3. However, I am getting two different
> errors for the projects relating to, i think, how the core-site.xml file is
> being read. I am running the project locally in IntelliJ. I have the
> environment variable in run configurations set to
> HADOOP_HOME=path/to/dir-with-c ore-site.xml. I have also tried saving the
> core-site.xml in the src/main/resources folder but get the same errors. I
> want to know if my core-site.xml file is configured correctly for using s3a
> and how to have IntelliJ read the core-site.xml file? Also, are the
> core-site.xml configurations different for reading versus writing to s3?
>
> This is my code for reading data from s3:
>
> public class DesktopWriter {
>
>     public static void main(String[] args) throws Exception {
>
>         ExecutionEnvironment env = ExecutionEnvironment.createLoc
> alEnvironment();
>         DataSet<String> data = env.readTextFile("s3://flink-t
> est/flink-test.txt");
>         data.print();
>     }
> }
> I get the error: Caused by: java.io.IOException: Cannot determine access
> key to Amazon S3. Please make sure to configure it by setting the
> configuration key 'fs.s3.accessKey'.
> This is my code for writing to S3:
> public class S3Sink {
>     public static void main(String[] args) throws Exception {
>         Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
> to/config.yaml");
>
>         final ParameterTool parameterTool = ParameterTool.*fromMap*
> (configs) ;
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.*get
> ExecutionEnvironment*();
>         env.getConfig(). disableSysoutLogging();
>         env.getConfig(). setGlobalJobParameters( parameterTool);
>
>         DataStream<String> messageStream = env
>                 .addSource(new FlinkKafkaConsumer09<String>(
>                         parameterTool.getRequired(" kafka.topic"),
>                         new SimpleStringSchema(),
>                         parameterTool.getProperties()) );
>
>         messageStream.writeAsText(" s3a://flink-test/flinktest.txt "
> ).setParallelism(1);
>
>         env.execute();
>     }
> I get the error: Caused by: java.io.IOException: The given file URI
> (s3://flink-test/flinktest.txt ) points to the HDFS NameNode at flink-test,
> but the File System could not be initialized with that address: Unable to
> load AWS credentials from any provider in the chain
>
> This is my core-site.xml:
> <configuration>
>     <property>
>         <name>fs.defaultFS</name>
>         <value>hdfs://localhost:9000</ value>
>     </property>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value>
>     </property>
>
>     <!-- Comma separated list of local directories used to buffer
>          large results prior to transmitting them to S3. -->
>     <property>
>         <name>fs.s3a.buffer.dir</name>
>         <value>/tmp</value>
>     </property>
>
>     <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.
> Constants -->
>     <property>
>         <name>fs.s3a.awsAccessKeyId</ name>
>         <value>*****</value>
>     </property>
>
>     <!-- set your AWS access key -->
>     <property>
>         <name>fs.s3a. awsSecretAccessKey</name>
>         <value>*****</value>
>     </property>
> </configuration>
> This is my pom.xml:
> <dependencies>
>     <dependency>
>         <groupId>org.apache.flink</gro upId>
>         <artifactId>flink-java</artifa ctId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</gro upId>
>         <artifactId>flink-streaming-ja va_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</gro upId>
>         <artifactId>flink-clients_2.10 </artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</gro upId>
>         <artifactId>flink-connector-ka fka-0.9_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>com.amazonaws</groupI d>
>         <artifactId>aws-java-sdk </artifactId>
>         <version>1.7.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.hadoop</gr oupId>
>         <artifactId>hadoop-aws</artifa ctId>
>         <version>2.7.2</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.httpcompon ents</groupId>
>         <artifactId>httpclient</artifa ctId>
>         <version>4.2.5</version>
>     </dependency>
>     <dependency>
>         <groupId>org.apache.httpcompon ents</groupId>
>         <artifactId>httpcore</artifact Id>
>         <version>4.2.5</version>
>     </dependency>
> </dependencies>
>
> Thanks!
> Sam
>
>
>
>
>
>
>


-- 


Samra Kasim

Technologist
*HUMAN*g*EO*

*Virginia Office*

*4350 N Fairfax Drive*
*Suite 950**Arlington, VA 22203*

E-Mail:  samra.kasim@thehumangeo.com  <sa...@thehumangeo.com>
Web:    http://www.thehumangeo.com/

Re: Reading and Writing to S3

Posted by "M. Dale" <me...@yahoo.com>.
Sam,   Don't point the variables at files, point them at the directories containing the files. Do you have fs.s3.impl property defined?
Concrete example:
/home/markus/hadoop-config directory has one file "core-site.xml" with thefollowing content:
<configuration>    <property>        <name>fs.s3.impl</name>        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large results prior to transmitting them to S3. -->    <property>        <name>fs.s3a.buffer.dir</name>        <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->    <property>        <name>fs.s3a.access.key</name>        <value>YOUR_ACCESS_KEY</value>    </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a.secret.key</name>        <value>YOUR_SECRET_KEY</value>    </property></configuration>
/home/markus/flink-config directory has one file "flink-conf.yaml" with the following content point hadoopconf to the DIRECTORY containing core-site.xml:
fs.hdfs.hadoopconf: /home/markus/hadoop-config
In IntelliJ, go to Run - Edit Configurations - <your run configuration> andset the FLINK_CONF_DIR environment variable to point to the directory containingflink-conf.yaml (i.e in my case /home/markus/flink-config). So everything is pointing to directories where the code looks for well-known filenames.
With that, the following works to write to S3. (Maybe load events from collection at first):
events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute 

    On Wednesday, January 11, 2017 10:44 AM, Samra Kasim <sa...@thehumangeo.com> wrote:
 

 Hi Markus,
Thanks for your help. I created an environment variable in IntelliJ for FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get the error: java.io.IOException: No file system found with scheme s3, referenced in file URI 's3://flink-test/ flinkoutputtest.txt'.
I have been able to get it to work by using the environment variable HADOOP_HOME to point directly to the core-site.xml, but when I do that and I push data from Kafka, I can see the message stream printed to my terminal, but no file gets saved to s3. I also don't see any errors. I have the correct AWS access id and key because i am able to read from files on s3 using Flink.
My code is below:    public static voidmain(String[] args) throws Exception {        Map<String,String> configs = ConfigUtils.loadConfigs("/ path/to/src/main/resources/ error-queue.yaml");         finalParameterTool parameterTool = ParameterTool.fromMap(configs) ;        StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment();       env.getConfig(). disableSysoutLogging();       env.getConfig(). setGlobalJobParameters( parameterTool)        DataStream<String> messageStream = env               .addSource(new FlinkKafkaConsumer09<String>(                       parameterTool.getRequired(" kafka.topic"),                       new SimpleStringSchema(),                       parameterTool.getProperties()) );        messageStream.print();        messageStream.writeAsText("s3: //flink-test/flinkoutputtest. txt").setParallelism(1);         env.execute();
On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <me...@yahoo.com> wrote:

Sam,  I just happened to answer a similar question on Stackoverflow at Does Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a PR to make that (for me) a little clearer on the Apache Flink documentation (https://github.com/apache/fli nk/pull/3054/files).  
|  
|  
|  
|   |    |

  |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project that produces a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

    On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <sa...@thehumangeo.com> wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data from s3 and 2) to push data to s3. However, I am getting two different errors for the projects relating to, i think, how the core-site.xml file is being read. I am running the project locally in IntelliJ. I have the environment variable in run configurations set to HADOOP_HOME=path/to/dir-with-c ore-site.xml. I have also tried saving the core-site.xml in the src/main/resources folder but get the same errors. I want to know if my core-site.xml file is configured correctly for using s3a and how to have IntelliJ read the core-site.xml file? Also, are the core-site.xml configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter {     public static voidmain(String[] args) throws Exception {        ExecutionEnvironment env =ExecutionEnvironment.createLoc alEnvironment();       DataSet<String> data = env.readTextFile("s3://flink-t est/flink-test.txt");        data.print();    }}I get the error: Caused by: java.io.IOException: Cannot determine access key to Amazon S3. Please make sure to configure it by setting the configuration key 'fs.s3.accessKey'.This is my code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.loadConfigs(“path/ to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" s3a://flink-test/flinktest.txt ").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI (s3://flink-test/flinktest.txt ) points to the HDFS NameNode at flink-test, but the File System could not be initialized with that address: Unable to load AWS credentials from any provider in the chain
This is my core-site.xml:
<configuration>    <property>        <name>fs.defaultFS</name>        <value>hdfs://localhost:9000</ value>    </property>    <property>        <name>fs.s3.impl</name>        <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large results prior to transmitting them to S3. -->    <property>        <name>fs.s3a.buffer.dir</name>        <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. Constants -->    <property>        <name>fs.s3a.awsAccessKeyId</ name>        <value>*****</value>    </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a. awsSecretAccessKey</name>        <value>*****</value>    </property></configuration>This is my pom.xml:<dependencies>    <dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-java</artifa ctId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</gro upId>        <artifactId>flink-streaming-ja va_2.10</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-clients_2.10 </artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</gro upId>       <artifactId>flink-connector-ka fka-0.9_2.10</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>com.amazonaws</groupI d>        <artifactId>aws-java-sdk </artifactId>       <version>1.7.4</version>   </dependency>     <dependency>       <groupId>org.apache.hadoop</gr oupId>       <artifactId>hadoop-aws</artifa ctId>       <version>2.7.2</version>   </dependency>     <dependency>       <groupId>org.apache.httpcompon ents</groupId>       <artifactId>httpclient</artifa ctId>       <version>4.2.5</version>   </dependency>    <dependency>       <groupId>org.apache.httpcompon ents</groupId>       <artifactId>httpcore</artifact Id>       <version>4.2.5</version>   </dependency></dependencies>
Thanks!Sam

   




   

Re: Reading and Writing to S3

Posted by Samra Kasim <sa...@thehumangeo.com>.
Hi Markus,

Thanks for your help. I created an environment variable in IntelliJ for
FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined
fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get
the error: java.io.IOException: No file system found with scheme s3,
referenced in file URI 's3://flink-test/flinkoutputtest.txt'.

I have been able to get it to work by using the environment variable
HADOOP_HOME to point directly to the core-site.xml, but when I do that and
I push data from Kafka, I can see the message stream printed to my
terminal, but no file gets saved to s3. I also don't see any errors. I have
the correct AWS access id and key because i am able to read from files on
s3 using Flink.

My code is below:

    public static void main(String[] args) throws Exception {

        Map<String, String> configs = ConfigUtils.loadConfigs("/
path/to/src/main/resources/error-queue.yaml");



        final ParameterTool parameterTool = ParameterTool.fromMap(configs);



        StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();

        env.getConfig().disableSysoutLogging();

        env.getConfig().setGlobalJobParameters(parameterTool)



        DataStream<String> messageStream = env

                .addSource(new FlinkKafkaConsumer09<String>(

                        parameterTool.getRequired("kafka.topic"),

                        new SimpleStringSchema(),

                        parameterTool.getProperties()));



        messageStream.print();

        messageStream.writeAsText("s3://flink-test/flinkoutputtest.
txt").setParallelism(1);



        env.execute();

On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <me...@yahoo.com> wrote:

> Sam,
>   I just happened to answer a similar question on Stackoverflow at Does
> Apache Flink AWS S3 Sink require Hadoop for local testing?
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>.
> I also submitted a PR to make that (for me) a little clearer on the Apache
> Flink documentation (https://github.com/apache/flink/pull/3054/files).
> Does Apache Flink AWS S3 Sink require Hadoop for local testing?
> I am relatively new to Apache Flink and I am trying to create a simple
> project that produces a file to an AWS S3...
>
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>
>
> Let me know if that works for you.
>
> Thanks,
> Markus
>
>
> On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <
> samra.kasim@thehumangeo.com> wrote:
>
>
> Hi,
>
> I am new to Flink and I've written two small test projects: 1) to read
> data from s3 and 2) to push data to s3. However, I am getting two different
> errors for the projects relating to, i think, how the core-site.xml file is
> being read. I am running the project locally in IntelliJ. I have the
> environment variable in run configurations set to
> HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
> core-site.xml in the src/main/resources folder but get the same errors. I
> want to know if my core-site.xml file is configured correctly for using s3a
> and how to have IntelliJ read the core-site.xml file? Also, are the
> core-site.xml configurations different for reading versus writing to s3?
>
> This is my code for reading data from s3:
>
> public class DesktopWriter {
>
>     public static void main(String[] args) throws Exception {
>
>         ExecutionEnvironment env = ExecutionEnvironment.createLoc
> alEnvironment();
>         DataSet<String> data = env.readTextFile("s3://flink-t
> est/flink-test.txt");
>         data.print();
>     }
> }
> I get the error: Caused by: java.io.IOException: Cannot determine access
> key to Amazon S3. Please make sure to configure it by setting the
> configuration key 'fs.s3.accessKey'.
> This is my code for writing to S3:
> public class S3Sink {
>     public static void main(String[] args) throws Exception {
>         Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
> to/config.yaml");
>
>         final ParameterTool parameterTool = ParameterTool.*fromMap*
> (configs) ;
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.*get
> ExecutionEnvironment*();
>         env.getConfig(). disableSysoutLogging();
>         env.getConfig(). setGlobalJobParameters( parameterTool);
>
>         DataStream<String> messageStream = env
>                 .addSource(new FlinkKafkaConsumer09<String>(
>                         parameterTool.getRequired(" kafka.topic"),
>                         new SimpleStringSchema(),
>                         parameterTool.getProperties()) );
>
>         messageStream.writeAsText(" s3a://flink-test/flinktest.txt"
> ).setParallelism(1);
>
>         env.execute();
>     }
> I get the error: Caused by: java.io.IOException: The given file URI
> (s3://flink-test/flinktest.txt) points to the HDFS NameNode at
> flink-test, but the File System could not be initialized with that address:
> Unable to load AWS credentials from any provider in the chain
>
> This is my core-site.xml:
> <configuration>
>     <property>
>         <name>fs.defaultFS</name>
>         <value>hdfs://localhost:9000</ value>
>     </property>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value>
>     </property>
>
>     <!-- Comma separated list of local directories used to buffer
>          large results prior to transmitting them to S3. -->
>     <property>
>         <name>fs.s3a.buffer.dir</name>
>         <value>/tmp</value>
>     </property>
>
>     <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.
> Constants -->
>     <property>
>         <name>fs.s3a.awsAccessKeyId</ name>
>         <value>*****</value>
>     </property>
>
>     <!-- set your AWS access key -->
>     <property>
>         <name>fs.s3a. awsSecretAccessKey</name>
>         <value>*****</value>
>     </property>
> </configuration>
> This is my pom.xml:
> <dependencies>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-java</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-streaming-java_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-clients_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>com.amazonaws</groupId>
>         <artifactId>aws-java-sdk</artifactId>
>         <version>1.7.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.hadoop</groupId>
>         <artifactId>hadoop-aws</artifactId>
>         <version>2.7.2</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.httpcomponents</groupId>
>         <artifactId>httpclient</artifactId>
>         <version>4.2.5</version>
>     </dependency>
>     <dependency>
>         <groupId>org.apache.httpcomponents</groupId>
>         <artifactId>httpcore</artifactId>
>         <version>4.2.5</version>
>     </dependency>
> </dependencies>
>
> Thanks!
> Sam
>
>
>

Re: Reading and Writing to S3

Posted by "M. Dale" <me...@yahoo.com>.
Sam,  I just happened to answer a similar question on Stackoverflow at Does Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a PR to make that (for me) a little clearer on the Apache Flink documentation (https://github.com/apache/flink/pull/3054/files).  
|  
|   
|   
|   |    |

   |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project that produces a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

    On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <sa...@thehumangeo.com> wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data from s3 and 2) to push data to s3. However, I am getting two different errors for the projects relating to, i think, how the core-site.xml file is being read. I am running the project locally in IntelliJ. I have the environment variable in run configurations set to HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the core-site.xml in the src/main/resources folder but get the same errors. I want to know if my core-site.xml file is configured correctly for using s3a and how to have IntelliJ read the core-site.xml file? Also, are the core-site.xml configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter {     public static voidmain(String[] args) throws Exception {        ExecutionEnvironment env =ExecutionEnvironment.createLocalEnvironment();       DataSet<String> data = env.readTextFile("s3://flink-test/flink-test.txt");        data.print();    }}I get the error: Caused by: java.io.IOException: Cannot determine access key to Amazon S3. Please make sure to configure it by setting the configuration key 'fs.s3.accessKey'.This is my code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.loadConfigs(“path/ to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" s3a://flink-test/flinktest.txt").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI (s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test, but the File System could not be initialized with that address: Unable to load AWS credentials from any provider in the chain
This is my core-site.xml:
<configuration>    <property>        <name>fs.defaultFS</name>        <value>hdfs://localhost:9000</ value>    </property>    <property>        <name>fs.s3.impl</name>        <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value>    </property>
    <!-- Comma separated list of local directories used to buffer         large results prior to transmitting them to S3. -->    <property>        <name>fs.s3a.buffer.dir</name>        <value>/tmp</value>    </property>
    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. Constants -->    <property>        <name>fs.s3a.awsAccessKeyId</ name>        <value>*****</value>    </property>
    <!-- set your AWS access key -->    <property>        <name>fs.s3a. awsSecretAccessKey</name>        <value>*****</value>    </property></configuration>This is my pom.xml:<dependencies>    <dependency>       <groupId>org.apache.flink</groupId>       <artifactId>flink-java</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</groupId>        <artifactId>flink-streaming-java_2.10</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</groupId>       <artifactId>flink-clients_2.10</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>org.apache.flink</groupId>       <artifactId>flink-connector-kafka-0.9_2.10</artifactId>       <version>1.1.4</version>   </dependency>     <dependency>       <groupId>com.amazonaws</groupId>        <artifactId>aws-java-sdk</artifactId>       <version>1.7.4</version>   </dependency>     <dependency>       <groupId>org.apache.hadoop</groupId>       <artifactId>hadoop-aws</artifactId>       <version>2.7.2</version>   </dependency>     <dependency>       <groupId>org.apache.httpcomponents</groupId>       <artifactId>httpclient</artifactId>       <version>4.2.5</version>   </dependency>    <dependency>       <groupId>org.apache.httpcomponents</groupId>       <artifactId>httpcore</artifactId>       <version>4.2.5</version>   </dependency></dependencies>
Thanks!Sam