You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2020/09/11 06:41:41 UTC

Struggling with reading the file from s3 as Source

Hi,

I want to *get data from S3 and process and send to Kinesis.*
1. Get gzip files from an s3 folder(s3://bucket/prefix)
2. Sort each file
3. Do some map/processing on each record in the file
4. send to Kinesis

Idea is:
env.readTextFile(s3Folder)
.sort(SortFunction)
.map(MapFunction)
.sink(KinesisSink)

Struggling with reading the file from s3.
//Assume env is setup properly
//The endpoint can either be a single file or a directory -
"s3://<bucket>/<endpoint>"
final DataStreamSource<String> stringDataStreamSource = env.
readTextFile(s3Folder);
stringDataStreamSource.print();

It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't
want anything to do with HDFS.
Just want to read from S3.
Saw a StackOverflow mention by David Anderson I think about using the Flink
SQL API.
I would appreciate any decent example to get the reading from S3 working.

TIA,
Vijay

Re: Struggling with reading the file from s3 as Source

Posted by Vijay Balakrishnan <bv...@gmail.com>.
My problem was the plugins jar needs to be under plugins/s3-fs-hadoop.
Running code with
Added to flink-conf.yaml:
s3.access-key:
s3.secret-key:

Removed from pom.xml all hadoop dependencies.

cd /<flink-dir>
/bin/start-cluster.sh
./bin/flink runxyz..jar

Still struggling with how to get it work with pom.xml in IntelliJ IDEA

On Mon, Sep 14, 2020 at 12:13 PM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi Robert,
> Thanks for the link.
> Is there a simple example I can use as a starting template for using S3
> with pom.xml ?
>
> I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop
> directory
> Running from flink-1.11.1/
> flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar
> /Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar
>
> Caused by: java.io.IOException: *Cannot find any jar files for plugin in
> directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop]. *
> Please provide the jar files for the plugin or delete the directory.
> at
> org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97)
>
> *IDEA*: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the
> plugins/s3-fs-hadoop directory)
> *How do I connect that to the pom.xml to run inside IntelliJ which points
> to the Apache repo??*
> pom.xml:
> Added hadoop dependencies:
> <dependencies>
>         <!-- Hadoop dependencies -->
>         <dependency>
>             <groupId>org.apache.hadoop</groupId>
>             <artifactId>hadoop-client</artifactId>
>             <version>${hadoop.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.hadoop</groupId>
>             <artifactId>hadoop-aws</artifactId>
>             <version>${hadoop.version}</version>
>         </dependency>
> <!-- Apache Flink dependencies -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
> This gives:
> Exception in thread "main" java.lang.IllegalStateException: *No
> ExecutorFactory found to execute the application.*
> at
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>
> TIA,
>
> On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Vijay,
>>
>> Can you post the error you are referring to?
>> Did you properly set up an s3 plugin (
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/)
>> ?
>>
>> On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I want to *get data from S3 and process and send to Kinesis.*
>>> 1. Get gzip files from an s3 folder(s3://bucket/prefix)
>>> 2. Sort each file
>>> 3. Do some map/processing on each record in the file
>>> 4. send to Kinesis
>>>
>>> Idea is:
>>> env.readTextFile(s3Folder)
>>> .sort(SortFunction)
>>> .map(MapFunction)
>>> .sink(KinesisSink)
>>>
>>> Struggling with reading the file from s3.
>>> //Assume env is setup properly
>>> //The endpoint can either be a single file or a directory -
>>> "s3://<bucket>/<endpoint>"
>>> final DataStreamSource<String> stringDataStreamSource = env.
>>> readTextFile(s3Folder);
>>> stringDataStreamSource.print();
>>>
>>> It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't
>>> want anything to do with HDFS.
>>> Just want to read from S3.
>>> Saw a StackOverflow mention by David Anderson I think about using the
>>> Flink SQL API.
>>> I would appreciate any decent example to get the reading from S3 working.
>>>
>>> TIA,
>>> Vijay
>>>
>>>

Re: Struggling with reading the file from s3 as Source

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi Robert,
Thanks for the link.
Is there a simple example I can use as a starting template for using S3
with pom.xml ?

I copied the flink-s3-fs-hadoop-1.11.1.jar into the plugins/s3-fs-hadoop
directory
Running from flink-1.11.1/
flink run -cp ../target/monitoring-rules-influx-1.0.jar -jar
/Users/vkbalakr/work/flink-examples/understanding-apache-flink/03-processing-infinite-streams-of-data/monitoring-rules-influx/target/monitoring-rules-influx-1.0.jar

Caused by: java.io.IOException: *Cannot find any jar files for plugin in
directory [/Users/vkbalakr/flink/flink-1.11.1 2/plugins/s3-fs-hadoop]. *
Please provide the jar files for the plugin or delete the directory.
at
org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:97)

*IDEA*: (I copied the flink-s3-fs-hadoop-1.11.1.jar into the
plugins/s3-fs-hadoop directory)
*How do I connect that to the pom.xml to run inside IntelliJ which points
to the Apache repo??*
pom.xml:
Added hadoop dependencies:
<dependencies>
        <!-- Hadoop dependencies -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
<!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

This gives:
Exception in thread "main" java.lang.IllegalStateException: *No
ExecutorFactory found to execute the application.*
at
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)

TIA,

On Fri, Sep 11, 2020 at 11:09 AM Robert Metzger <rm...@apache.org> wrote:

> Hi Vijay,
>
> Can you post the error you are referring to?
> Did you properly set up an s3 plugin (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) ?
>
> On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I want to *get data from S3 and process and send to Kinesis.*
>> 1. Get gzip files from an s3 folder(s3://bucket/prefix)
>> 2. Sort each file
>> 3. Do some map/processing on each record in the file
>> 4. send to Kinesis
>>
>> Idea is:
>> env.readTextFile(s3Folder)
>> .sort(SortFunction)
>> .map(MapFunction)
>> .sink(KinesisSink)
>>
>> Struggling with reading the file from s3.
>> //Assume env is setup properly
>> //The endpoint can either be a single file or a directory -
>> "s3://<bucket>/<endpoint>"
>> final DataStreamSource<String> stringDataStreamSource = env.
>> readTextFile(s3Folder);
>> stringDataStreamSource.print();
>>
>> It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't
>> want anything to do with HDFS.
>> Just want to read from S3.
>> Saw a StackOverflow mention by David Anderson I think about using the
>> Flink SQL API.
>> I would appreciate any decent example to get the reading from S3 working.
>>
>> TIA,
>> Vijay
>>
>>

Re: Struggling with reading the file from s3 as Source

Posted by Robert Metzger <rm...@apache.org>.
Hi Vijay,

Can you post the error you are referring to?
Did you properly set up an s3 plugin (
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) ?

On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi,
>
> I want to *get data from S3 and process and send to Kinesis.*
> 1. Get gzip files from an s3 folder(s3://bucket/prefix)
> 2. Sort each file
> 3. Do some map/processing on each record in the file
> 4. send to Kinesis
>
> Idea is:
> env.readTextFile(s3Folder)
> .sort(SortFunction)
> .map(MapFunction)
> .sink(KinesisSink)
>
> Struggling with reading the file from s3.
> //Assume env is setup properly
> //The endpoint can either be a single file or a directory -
> "s3://<bucket>/<endpoint>"
> final DataStreamSource<String> stringDataStreamSource = env.
> readTextFile(s3Folder);
> stringDataStreamSource.print();
>
> It keeps *erroring* saying I need some kind of *HDFS* setup ??? I don't
> want anything to do with HDFS.
> Just want to read from S3.
> Saw a StackOverflow mention by David Anderson I think about using the
> Flink SQL API.
> I would appreciate any decent example to get the reading from S3 working.
>
> TIA,
> Vijay
>
>