You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Harshvardhan Shinde <ha...@oyorooms.com> on 2021/09/24 09:52:32 UTC

Write Streaming data to S3 in Parquet files

Hi,
I wanted to know if we can write streaming data to S3 in parquet format
with partitioning.
Here's what I want to achieve:
I have a kafka table which gets updated with the data from kafka topic and
I'm using select statement to get the data into a Table and converting into
a stream as:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.sqlQuery("Select * from test");
DataStream<Row> stream = tableEnv.toDataStream(table);

Now I want to write this stream to S3 in parquet files with hourly
partitions.

Here are my questions:
1. Is this possible?
2. If yes, how it can be achieved or link to appropriate documentation.

Thanks and Regards,
Harshvardhan

Re: Write Streaming data to S3 in Parquet files

Posted by Guowei Ma <gu...@gmail.com>.
Hi,Harshvardhan

I think you could use some factory such as `ParquetAvroWriters.forXXXX`
form `ParquetAvroWriters.java` [1].
And you could see more same class in the package
`org.apache.flink.formats.parquet.`

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java

Best,
Guowei


On Mon, Sep 27, 2021 at 2:36 AM Harshvardhan Shinde <
harshvardhan.shinde@oyorooms.com> wrote:

> Hi,
>
> Thanks for the response.
>
> How can this streaming data be written to S3 for the path to be given?
> Also I see that the FileSink takes GenericRecord, so how can the
> DataStream be converted to a GenericRecord?
>
> Please bear with me if my questions don't make any sense.
>
> On Sun, Sep 26, 2021 at 9:12 AM Guowei Ma <gu...@gmail.com> wrote:
>
>> Hi, Harshvardhan
>>
>> I think CaiZhi is right.
>> I only have a small addition. Because I see that you want to convert
>> Table to DataStream, you can look at FileSink (ParquetWriterFactory)[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/#bulk-encoded-formats
>>
>> Best,
>> Guowei
>>
>>
>> On Sun, Sep 26, 2021 at 10:31 AM Caizhi Weng <ts...@gmail.com>
>> wrote:
>>
>>> Hi!
>>>
>>> Try the PARTITIONED BY clause. See
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/
>>>
>>> Harshvardhan Shinde <ha...@oyorooms.com> 于2021年9月24日周五
>>> 下午5:52写道:
>>>
>>>> Hi,
>>>> I wanted to know if we can write streaming data to S3 in parquet format
>>>> with partitioning.
>>>> Here's what I want to achieve:
>>>> I have a kafka table which gets updated with the data from kafka topic
>>>> and I'm using select statement to get the data into a Table and converting
>>>> into a stream as:
>>>>
>>>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>>> Table table = tableEnv.sqlQuery("Select * from test");
>>>> DataStream<Row> stream = tableEnv.toDataStream(table);
>>>>
>>>> Now I want to write this stream to S3 in parquet files with hourly
>>>> partitions.
>>>>
>>>> Here are my questions:
>>>> 1. Is this possible?
>>>> 2. If yes, how it can be achieved or link to appropriate documentation.
>>>>
>>>> Thanks and Regards,
>>>> Harshvardhan
>>>>
>>>>
>
> --
> Thanks and Regards,
> Harshvardhan
> Data Platform
>

Re: Write Streaming data to S3 in Parquet files

Posted by Harshvardhan Shinde <ha...@oyorooms.com>.
Hi,

Thanks for the response.

How can this streaming data be written to S3 for the path to be given?
Also I see that the FileSink takes GenericRecord, so how can the DataStream
be converted to a GenericRecord?

Please bear with me if my questions don't make any sense.

On Sun, Sep 26, 2021 at 9:12 AM Guowei Ma <gu...@gmail.com> wrote:

> Hi, Harshvardhan
>
> I think CaiZhi is right.
> I only have a small addition. Because I see that you want to convert Table
> to DataStream, you can look at FileSink (ParquetWriterFactory)[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/#bulk-encoded-formats
>
> Best,
> Guowei
>
>
> On Sun, Sep 26, 2021 at 10:31 AM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> Try the PARTITIONED BY clause. See
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/
>>
>> Harshvardhan Shinde <ha...@oyorooms.com> 于2021年9月24日周五
>> 下午5:52写道:
>>
>>> Hi,
>>> I wanted to know if we can write streaming data to S3 in parquet format
>>> with partitioning.
>>> Here's what I want to achieve:
>>> I have a kafka table which gets updated with the data from kafka topic
>>> and I'm using select statement to get the data into a Table and converting
>>> into a stream as:
>>>
>>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>> Table table = tableEnv.sqlQuery("Select * from test");
>>> DataStream<Row> stream = tableEnv.toDataStream(table);
>>>
>>> Now I want to write this stream to S3 in parquet files with hourly
>>> partitions.
>>>
>>> Here are my questions:
>>> 1. Is this possible?
>>> 2. If yes, how it can be achieved or link to appropriate documentation.
>>>
>>> Thanks and Regards,
>>> Harshvardhan
>>>
>>>

-- 
Thanks and Regards,
Harshvardhan
Data Platform

Re: Write Streaming data to S3 in Parquet files

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Harshvardhan

I think CaiZhi is right.
I only have a small addition. Because I see that you want to convert Table
to DataStream, you can look at FileSink (ParquetWriterFactory)[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/#bulk-encoded-formats

Best,
Guowei


On Sun, Sep 26, 2021 at 10:31 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> Try the PARTITIONED BY clause. See
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/
>
> Harshvardhan Shinde <ha...@oyorooms.com> 于2021年9月24日周五
> 下午5:52写道:
>
>> Hi,
>> I wanted to know if we can write streaming data to S3 in parquet format
>> with partitioning.
>> Here's what I want to achieve:
>> I have a kafka table which gets updated with the data from kafka topic
>> and I'm using select statement to get the data into a Table and converting
>> into a stream as:
>>
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>> Table table = tableEnv.sqlQuery("Select * from test");
>> DataStream<Row> stream = tableEnv.toDataStream(table);
>>
>> Now I want to write this stream to S3 in parquet files with hourly
>> partitions.
>>
>> Here are my questions:
>> 1. Is this possible?
>> 2. If yes, how it can be achieved or link to appropriate documentation.
>>
>> Thanks and Regards,
>> Harshvardhan
>>
>>

Re: Write Streaming data to S3 in Parquet files

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Try the PARTITIONED BY clause. See
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/

Harshvardhan Shinde <ha...@oyorooms.com> 于2021年9月24日周五
下午5:52写道:

> Hi,
> I wanted to know if we can write streaming data to S3 in parquet format
> with partitioning.
> Here's what I want to achieve:
> I have a kafka table which gets updated with the data from kafka topic and
> I'm using select statement to get the data into a Table and converting into
> a stream as:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Table table = tableEnv.sqlQuery("Select * from test");
> DataStream<Row> stream = tableEnv.toDataStream(table);
>
> Now I want to write this stream to S3 in parquet files with hourly
> partitions.
>
> Here are my questions:
> 1. Is this possible?
> 2. If yes, how it can be achieved or link to appropriate documentation.
>
> Thanks and Regards,
> Harshvardhan
>
>