You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Smith <ja...@gmail.com> on 2020/07/27 14:53:06 UTC

How to stream CSV from S3?

Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is
copied to S3?
3- Is that part of the table APIs?

Re: How to stream CSV from S3?

Posted by Jingsong Li <ji...@gmail.com>.
Yes, you can try `StreamExecutionEnvironment.readFile(RowCsvInputFormat,
filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)`. (And
wrap it to a table if you want)

On Tue, Jul 28, 2020 at 3:46 PM John Smith <ja...@gmail.com> wrote:

> Bassically I want to "monitor" a bucket on S3 and every file that gets
> created in that bucket read it and stream it.
>
> If I understand correctly, I can just use env.readCsvFile() and config to
> continuously read a folder path?
>
>
> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <ji...@gmail.com>
> wrote:
>
>> Hi John,
>>
>> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>>
>> If just using the DataSet API, you can use CsvInputFormat to read csv
>> files.
>>
>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>> support partitioned table. So the only way is specific the partition/bucket
>> path, and read single directory.
>>
>> In 1.11, the Table/Sql filesystem connector with csv format supports
>> partitioned table, complete support partition semantics.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com>
>> wrote:
>>
>>> Hi, using Flink 1.10
>>>
>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>> 2- Is there a source that can tail S3 and start reading a CSV when it is
>>> copied to S3?
>>> 3- Is that part of the table APIs?
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee

Re: How to stream CSV from S3?

Posted by John Smith <ja...@gmail.com>.
Hi Yes it works :)

For the Java guys...

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

String path = "file:///foo/bar";

TypeInformation[] fieldTypes = new TypeInformation[]{
      BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.STRING_TYPE_INFO
};

RowCsvInputFormat csvFormat =
      new RowCsvInputFormat(
            new Path(path), fieldTypes);
csvFormat.setSkipFirstLineAsHeader(true);

DataStreamSource<Row> lines = env.readFile(csvFormat, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);

lines.map(value -> value).print();


On Thu, 30 Jul 2020 at 05:42, Arvid Heise <ar...@ververica.com> wrote:

> Hi John,
>
> I found an example on SO [1] in Scala.
>
> [1] https://stackoverflow.com/a/52093079/10299342
>
> On Tue, Jul 28, 2020 at 4:29 PM John Smith <ja...@gmail.com> wrote:
>
>> Hi, is there an example on how RowCsvInputFormat is initialized?
>>
>> On Tue, 28 Jul 2020 at 04:00, Jingsong Li <ji...@gmail.com> wrote:
>>
>>> - `env.readCsvFile` is in DataSet, just read the full amount of data
>>> once in batch mode.
>>> - `streamEnv.readFile(RowCsvInputFormat, filePath,
>>> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
>>> directory, and continue reading in streaming mode.
>>>
>>> On Tue, Jul 28, 2020 at 3:54 PM John Smith <ja...@gmail.com>
>>> wrote:
>>>
>>>> Also this where I find the docs confusing in the "connectors" section.
>>>> File system isn't under Data streaming but env.readCsvFile seems like it
>>>> can do the trick?
>>>>
>>>> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <ja...@gmail.com>
>>>> wrote:
>>>>
>>>>> Bassically I want to "monitor" a bucket on S3 and every file that gets
>>>>> created in that bucket read it and stream it.
>>>>>
>>>>> If I understand correctly, I can just use env.readCsvFile() and config
>>>>> to continuously read a folder path?
>>>>>
>>>>>
>>>>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <ji...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi John,
>>>>>>
>>>>>> Do you mean you want to read S3 CSV files using
>>>>>> partition/bucket pruning?
>>>>>>
>>>>>> If just using the DataSet API, you can use CsvInputFormat to read csv
>>>>>> files.
>>>>>>
>>>>>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>>>>>> support partitioned table. So the only way is specific the partition/bucket
>>>>>> path, and read single directory.
>>>>>>
>>>>>> In 1.11, the Table/Sql filesystem connector with csv format supports
>>>>>> partitioned table, complete support partition semantics.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, using Flink 1.10
>>>>>>>
>>>>>>> 1- How do we go about reading CSV files that are copied to s3
>>>>>>> buckets?
>>>>>>> 2- Is there a source that can tail S3 and start reading a CSV when
>>>>>>> it is copied to S3?
>>>>>>> 3- Is that part of the table APIs?
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: How to stream CSV from S3?

Posted by Arvid Heise <ar...@ververica.com>.
Hi John,

I found an example on SO [1] in Scala.

[1] https://stackoverflow.com/a/52093079/10299342

On Tue, Jul 28, 2020 at 4:29 PM John Smith <ja...@gmail.com> wrote:

> Hi, is there an example on how RowCsvInputFormat is initialized?
>
> On Tue, 28 Jul 2020 at 04:00, Jingsong Li <ji...@gmail.com> wrote:
>
>> - `env.readCsvFile` is in DataSet, just read the full amount of data once
>> in batch mode.
>> - `streamEnv.readFile(RowCsvInputFormat, filePath,
>> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
>> directory, and continue reading in streaming mode.
>>
>> On Tue, Jul 28, 2020 at 3:54 PM John Smith <ja...@gmail.com>
>> wrote:
>>
>>> Also this where I find the docs confusing in the "connectors" section.
>>> File system isn't under Data streaming but env.readCsvFile seems like it
>>> can do the trick?
>>>
>>> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <ja...@gmail.com>
>>> wrote:
>>>
>>>> Bassically I want to "monitor" a bucket on S3 and every file that gets
>>>> created in that bucket read it and stream it.
>>>>
>>>> If I understand correctly, I can just use env.readCsvFile() and config
>>>> to continuously read a folder path?
>>>>
>>>>
>>>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> Do you mean you want to read S3 CSV files using
>>>>> partition/bucket pruning?
>>>>>
>>>>> If just using the DataSet API, you can use CsvInputFormat to read csv
>>>>> files.
>>>>>
>>>>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>>>>> support partitioned table. So the only way is specific the partition/bucket
>>>>> path, and read single directory.
>>>>>
>>>>> In 1.11, the Table/Sql filesystem connector with csv format supports
>>>>> partitioned table, complete support partition semantics.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, using Flink 1.10
>>>>>>
>>>>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>>>>> 2- Is there a source that can tail S3 and start reading a CSV when it
>>>>>> is copied to S3?
>>>>>> 3- Is that part of the table APIs?
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: How to stream CSV from S3?

Posted by John Smith <ja...@gmail.com>.
Hi, is there an example on how RowCsvInputFormat is initialized?

On Tue, 28 Jul 2020 at 04:00, Jingsong Li <ji...@gmail.com> wrote:

> - `env.readCsvFile` is in DataSet, just read the full amount of data once
> in batch mode.
> - `streamEnv.readFile(RowCsvInputFormat, filePath,
> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
> directory, and continue reading in streaming mode.
>
> On Tue, Jul 28, 2020 at 3:54 PM John Smith <ja...@gmail.com> wrote:
>
>> Also this where I find the docs confusing in the "connectors" section.
>> File system isn't under Data streaming but env.readCsvFile seems like it
>> can do the trick?
>>
>> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <ja...@gmail.com>
>> wrote:
>>
>>> Bassically I want to "monitor" a bucket on S3 and every file that gets
>>> created in that bucket read it and stream it.
>>>
>>> If I understand correctly, I can just use env.readCsvFile() and config
>>> to continuously read a folder path?
>>>
>>>
>>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi John,
>>>>
>>>> Do you mean you want to read S3 CSV files using
>>>> partition/bucket pruning?
>>>>
>>>> If just using the DataSet API, you can use CsvInputFormat to read csv
>>>> files.
>>>>
>>>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>>>> support partitioned table. So the only way is specific the partition/bucket
>>>> path, and read single directory.
>>>>
>>>> In 1.11, the Table/Sql filesystem connector with csv format supports
>>>> partitioned table, complete support partition semantics.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, using Flink 1.10
>>>>>
>>>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>>>> 2- Is there a source that can tail S3 and start reading a CSV when it
>>>>> is copied to S3?
>>>>> 3- Is that part of the table APIs?
>>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>
> --
> Best, Jingsong Lee
>

Re: How to stream CSV from S3?

Posted by Jingsong Li <ji...@gmail.com>.
- `env.readCsvFile` is in DataSet, just read the full amount of data once
in batch mode.
- `streamEnv.readFile(RowCsvInputFormat, filePath,
FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
directory, and continue reading in streaming mode.

On Tue, Jul 28, 2020 at 3:54 PM John Smith <ja...@gmail.com> wrote:

> Also this where I find the docs confusing in the "connectors" section.
> File system isn't under Data streaming but env.readCsvFile seems like it
> can do the trick?
>
> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <ja...@gmail.com>
> wrote:
>
>> Bassically I want to "monitor" a bucket on S3 and every file that gets
>> created in that bucket read it and stream it.
>>
>> If I understand correctly, I can just use env.readCsvFile() and config to
>> continuously read a folder path?
>>
>>
>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <ji...@gmail.com>
>> wrote:
>>
>>> Hi John,
>>>
>>> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>>>
>>> If just using the DataSet API, you can use CsvInputFormat to read csv
>>> files.
>>>
>>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>>> support partitioned table. So the only way is specific the partition/bucket
>>> path, and read single directory.
>>>
>>> In 1.11, the Table/Sql filesystem connector with csv format supports
>>> partitioned table, complete support partition semantics.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com>
>>> wrote:
>>>
>>>> Hi, using Flink 1.10
>>>>
>>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>>> 2- Is there a source that can tail S3 and start reading a CSV when it
>>>> is copied to S3?
>>>> 3- Is that part of the table APIs?
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

-- 
Best, Jingsong Lee

Re: How to stream CSV from S3?

Posted by John Smith <ja...@gmail.com>.
Also this where I find the docs confusing in the "connectors" section. File
system isn't under Data streaming but env.readCsvFile seems like it can do
the trick?

On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <ja...@gmail.com>
wrote:

> Bassically I want to "monitor" a bucket on S3 and every file that gets
> created in that bucket read it and stream it.
>
> If I understand correctly, I can just use env.readCsvFile() and config to
> continuously read a folder path?
>
>
> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <ji...@gmail.com>
> wrote:
>
>> Hi John,
>>
>> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>>
>> If just using the DataSet API, you can use CsvInputFormat to read csv
>> files.
>>
>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>> support partitioned table. So the only way is specific the partition/bucket
>> path, and read single directory.
>>
>> In 1.11, the Table/Sql filesystem connector with csv format supports
>> partitioned table, complete support partition semantics.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com>
>> wrote:
>>
>>> Hi, using Flink 1.10
>>>
>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>> 2- Is there a source that can tail S3 and start reading a CSV when it is
>>> copied to S3?
>>> 3- Is that part of the table APIs?
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Re: How to stream CSV from S3?

Posted by John Smith <ja...@gmail.com>.
Bassically I want to "monitor" a bucket on S3 and every file that gets
created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to
continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <ji...@gmail.com>
wrote:

> Hi John,
>
> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>
> If just using the DataSet API, you can use CsvInputFormat to read csv
> files.
>
> If you want to use Table/Sql API, In 1.10, Csv format in table not support
> partitioned table. So the only way is specific the partition/bucket path,
> and read single directory.
>
> In 1.11, the Table/Sql filesystem connector with csv format supports
> partitioned table, complete support partition semantics.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>
> Best,
> Jingsong
>
> On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com>
> wrote:
>
>> Hi, using Flink 1.10
>>
>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>> 2- Is there a source that can tail S3 and start reading a CSV when it is
>> copied to S3?
>> 3- Is that part of the table APIs?
>>
>
>
> --
> Best, Jingsong Lee
>

Re: How to stream CSV from S3?

Posted by Jingsong Li <ji...@gmail.com>.
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support
partitioned table. So the only way is specific the partition/bucket path,
and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports
partitioned table, complete support partition semantics.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith <ja...@gmail.com> wrote:

> Hi, using Flink 1.10
>
> 1- How do we go about reading CSV files that are copied to s3 buckets?
> 2- Is there a source that can tail S3 and start reading a CSV when it is
> copied to S3?
> 3- Is that part of the table APIs?
>


-- 
Best, Jingsong Lee