You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arvid Heise <ar...@ververica.com> on 2020/01/08 13:31:09 UTC

Re: Flink Dataset to ParquetOutputFormat

Hi Anji,

StreamingFileSink has a BucketAssigner that you can use for that purpose.

From the javadoc: The sink uses a BucketAssigner to determine in which
bucket directory each element should be written to inside the base
directory. The BucketAssigner can, for example, use time or a property of
the element to determine the bucket directory. The default BucketAssigner
is a DateTimeBucketAssigner which will create one new bucket every hour.
You can specify a custom BucketAssigner using the
setBucketAssigner(bucketAssigner) method, after calling forRowFormat(Path,
Encoder) or forBulkFormat(Path, BulkWriter.Factory).

If that doesn't work for you, please let me know. Btw, are you using event
or processing time?

Best,

Arvid

On Fri, Dec 27, 2019 at 4:24 AM vino yang <ya...@gmail.com> wrote:

> Hi Anji,
>
> Actually, I am not familiar with how to partition via timestamp. Flink's
> streaming BucketingSink provides this feature.[1] You may refer to this
> link and customize your sink.
>
> I can ping a professional committer who knows more detail of FS connector
> than me, @kkloudas@gmail.com <kk...@gmail.com> may give you help.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink
>
> aj <aj...@gmail.com> 于2019年12月27日周五 上午1:51写道:
>
>> Thanks Vino.
>>
>> I am able to write data in parquet now. But now the issue is how to write
>> a dataset to multiple output path as per timestamp partition.
>> I want to partition data on date wise.
>>
>> I am writing like this currently that will write to single output path.
>>
>> DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new EventMapProcessor(schema.toString())).withParameters(configuration);
>>
>> Job job = Job.getInstance();
>> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
>> HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, GenericRecord>(new AvroParquetOutputFormat(), job);
>> FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
>>
>> df.output(parquetFormat);
>> env.execute();
>>
>>
>> Please suggest.
>>
>> Thanks,
>> Anuj
>>
>> On Mon, Dec 23, 2019 at 12:59 PM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> After searching in Github, I found a demo repository about how to use
>>> parquet in Flink.[1]
>>>
>>> You can have a look. I can not make sure whether it is helpful or not.
>>>
>>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>>>
>>> Best,
>>> Vino
>>>
>>> aj <aj...@gmail.com> 于2019年12月21日周六 下午7:03写道:
>>>
>>>> Hello All,
>>>>
>>>> I am getting a set of events in JSON that I am dumping in the hourly
>>>> bucket in S3.
>>>> I am reading this hourly bucket and created a DataSet<String>.
>>>>
>>>> I want to write this dataset as a parquet but I am not able to figure
>>>> out. Can somebody help me with this?
>>>>
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

Re: Flink Dataset to ParquetOutputFormat

Posted by Arvid Heise <ar...@ververica.com>.
Hi Anuj,

as far as I know, there is nothing like that on the Dataset side.

Could you implement your query on Datastream with bounded inputs?
In the long term, Dataset API should be completely replaced with Datastream
API.

Best,

Arvid

On Thu, Jan 16, 2020 at 12:35 PM aj <aj...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks for the details reply. I am using Dataset API and its a batch job
> so wondering is the option you provided is works for that.
>
> Thanks,
> Anuj
>
> On Wed, Jan 8, 2020 at 7:01 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Anji,
>>
>> StreamingFileSink has a BucketAssigner that you can use for that purpose.
>>
>> From the javadoc: The sink uses a BucketAssigner to determine in which
>> bucket directory each element should be written to inside the base
>> directory. The BucketAssigner can, for example, use time or a property of
>> the element to determine the bucket directory. The default BucketAssigner
>> is a DateTimeBucketAssigner which will create one new bucket every hour.
>> You can specify a custom BucketAssigner using the
>> setBucketAssigner(bucketAssigner) method, after calling forRowFormat(Path,
>> Encoder) or forBulkFormat(Path, BulkWriter.Factory).
>>
>> If that doesn't work for you, please let me know. Btw, are you using
>> event or processing time?
>>
>> Best,
>>
>> Arvid
>>
>> On Fri, Dec 27, 2019 at 4:24 AM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Anji,
>>>
>>> Actually, I am not familiar with how to partition via timestamp. Flink's
>>> streaming BucketingSink provides this feature.[1] You may refer to this
>>> link and customize your sink.
>>>
>>> I can ping a professional committer who knows more detail of FS
>>> connector than me, @kkloudas@gmail.com <kk...@gmail.com> may give
>>> you help.
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink
>>>
>>> aj <aj...@gmail.com> 于2019年12月27日周五 上午1:51写道:
>>>
>>>> Thanks Vino.
>>>>
>>>> I am able to write data in parquet now. But now the issue is how to
>>>> write a dataset to multiple output path as per timestamp partition.
>>>> I want to partition data on date wise.
>>>>
>>>> I am writing like this currently that will write to single output path.
>>>>
>>>> DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new EventMapProcessor(schema.toString())).withParameters(configuration);
>>>>
>>>> Job job = Job.getInstance();
>>>> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
>>>> HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, GenericRecord>(new AvroParquetOutputFormat(), job);
>>>> FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
>>>>
>>>> df.output(parquetFormat);
>>>> env.execute();
>>>>
>>>>
>>>> Please suggest.
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>> On Mon, Dec 23, 2019 at 12:59 PM vino yang <ya...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> After searching in Github, I found a demo repository about how to use
>>>>> parquet in Flink.[1]
>>>>>
>>>>> You can have a look. I can not make sure whether it is helpful or not.
>>>>>
>>>>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> aj <aj...@gmail.com> 于2019年12月21日周六 下午7:03写道:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> I am getting a set of events in JSON that I am dumping in the hourly
>>>>>> bucket in S3.
>>>>>> I am reading this hourly bucket and created a DataSet<String>.
>>>>>>
>>>>>> I want to write this dataset as a parquet but I am not able to figure
>>>>>> out. Can somebody help me with this?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Anuj
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: Flink Dataset to ParquetOutputFormat

Posted by aj <aj...@gmail.com>.
Hi Arvid,

Thanks for the details reply. I am using Dataset API and its a batch job so
wondering is the option you provided is works for that.

Thanks,
Anuj

On Wed, Jan 8, 2020 at 7:01 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anji,
>
> StreamingFileSink has a BucketAssigner that you can use for that purpose.
>
> From the javadoc: The sink uses a BucketAssigner to determine in which
> bucket directory each element should be written to inside the base
> directory. The BucketAssigner can, for example, use time or a property of
> the element to determine the bucket directory. The default BucketAssigner
> is a DateTimeBucketAssigner which will create one new bucket every hour.
> You can specify a custom BucketAssigner using the
> setBucketAssigner(bucketAssigner) method, after calling forRowFormat(Path,
> Encoder) or forBulkFormat(Path, BulkWriter.Factory).
>
> If that doesn't work for you, please let me know. Btw, are you using event
> or processing time?
>
> Best,
>
> Arvid
>
> On Fri, Dec 27, 2019 at 4:24 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Anji,
>>
>> Actually, I am not familiar with how to partition via timestamp. Flink's
>> streaming BucketingSink provides this feature.[1] You may refer to this
>> link and customize your sink.
>>
>> I can ping a professional committer who knows more detail of FS connector
>> than me, @kkloudas@gmail.com <kk...@gmail.com> may give you help.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink
>>
>> aj <aj...@gmail.com> 于2019年12月27日周五 上午1:51写道:
>>
>>> Thanks Vino.
>>>
>>> I am able to write data in parquet now. But now the issue is how to
>>> write a dataset to multiple output path as per timestamp partition.
>>> I want to partition data on date wise.
>>>
>>> I am writing like this currently that will write to single output path.
>>>
>>> DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new EventMapProcessor(schema.toString())).withParameters(configuration);
>>>
>>> Job job = Job.getInstance();
>>> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
>>> HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, GenericRecord>(new AvroParquetOutputFormat(), job);
>>> FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
>>>
>>> df.output(parquetFormat);
>>> env.execute();
>>>
>>>
>>> Please suggest.
>>>
>>> Thanks,
>>> Anuj
>>>
>>> On Mon, Dec 23, 2019 at 12:59 PM vino yang <ya...@gmail.com>
>>> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> After searching in Github, I found a demo repository about how to use
>>>> parquet in Flink.[1]
>>>>
>>>> You can have a look. I can not make sure whether it is helpful or not.
>>>>
>>>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> aj <aj...@gmail.com> 于2019年12月21日周六 下午7:03写道:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am getting a set of events in JSON that I am dumping in the hourly
>>>>> bucket in S3.
>>>>> I am reading this hourly bucket and created a DataSet<String>.
>>>>>
>>>>> I want to write this dataset as a parquet but I am not able to figure
>>>>> out. Can somebody help me with this?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Anuj
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>