You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rafi Aroch <ra...@gmail.com> on 2018/10/25 11:08:40 UTC

BucketingSink capabilities for DataSet API

Hi,

I'm writing a Batch job which reads Parquet, does some aggregations and
writes back as Parquet files.
I would like the output to be partitioned by year, month, day by event
time. Similarly to the functionality of the BucketingSink.

I was able to achieve the reading/writing to/from Parquet by using the
hadoop-compatibility features.
I couldn't find a way to partition the data by year, month, day to create a
folder hierarchy accordingly. Everything is written to a single directory.

I could find an unanswered question about this issue:
https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit

Can anyone suggest a way to achieve this? Maybe there's a way to integrate
the BucketingSink with the DataSet API? Another solution?

Rafi

Re: BucketingSink capabilities for DataSet API

Posted by aj <aj...@gmail.com>.
Thanks, Rafi. I will try with this but yes if partitioning is not possible
then I also have to look some other solution.

On Wed, Feb 19, 2020 at 3:44 PM Rafi Aroch <ra...@gmail.com> wrote:

> Hi Anuj,
>
> It's been a while since I wrote this (Flink 1.5.2). Could be a
> better/newer way, but this is what how I read & write Parquet with
> hadoop-compatibility:
>
> // imports
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>>
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
>
> import org.apache.flink.hadoopcompatibility.HadoopInputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.parquet.avro.AvroParquetInputFormat;
>>
>> // Creating Parquet input format
>> Configuration conf = new Configuration();
>> Job job = Job.getInstance(conf);
>> AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
>> AvroParquetInputFormat<>();
>> AvroParquetInputFormat.setInputDirRecursive(job, true);
>> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
>> HadoopInputFormat<Void, GenericRecord> inputFormat
>> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
>> GenericRecord.class, job);
>>
>
>
>> // Creating Parquet output format
>> AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
>> AvroParquetOutputFormat<>();
>> AvroParquetOutputFormat.setSchema(job, new
>> Schema.Parser().parse(SomeEvent.SCHEMA));
>> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>> AvroParquetOutputFormat.setCompressOutput(job, true);
>> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
>> HadoopOutputFormat<Void, GenericRecord> outputFormat = new
>> HadoopOutputFormat<>(parquetOutputFormat, job);
>
>
>
> DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
>> env.createInput(inputFormat);
>
>
>
> // Start processing...
>
>
>
> // Writing result as Parquet
>> resultDataSet.output(outputFormat);
>
>
> Regarding writing partitioned data, as far as I know, there is no way to
> achieve that with the DataSet API with hadoop-compatibility.
>
> You could implement this with reading from input files as stream and then
> using StreamingFileSink with a custom BucketAssigner [1].
> The problem with that (which was not yet resolved AFAIK) is described here
> [2] in "Important Notice 2".
>
> Sadly I say, that eventually, for this use-case I chose Spark to do the
> job...
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
>
> Hope this helps.
>
> Rafi
>
>
> On Sat, Feb 15, 2020 at 5:03 PM aj <aj...@gmail.com> wrote:
>
>> Hi Rafi,
>>
>> I have a similar use case where I want to read parquet files in the
>> dataset and want to perform some transformation and similarly want to write
>> the result using year month day partitioned.
>>
>> I am stuck at first step only where how to read and write Parquet files
>> using hadoop-Compatability.
>>
>> Please help me with this and also if u find the solution for how to write
>> data in partitioned.
>>
>> Thanks,
>> Anuj
>>
>>
>> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <an...@data-artisans.com>
>> wrote:
>>
>>> Hi Rafi,
>>>
>>> At the moment I do not see any support of Parquet in DataSet API
>>> except HadoopOutputFormat, mentioned in stack overflow question. I have
>>> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>>>
>>> Best,
>>> Andrey
>>>
>>> On 25 Oct 2018, at 13:08, Rafi Aroch <ra...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a Batch job which reads Parquet, does some aggregations and
>>> writes back as Parquet files.
>>> I would like the output to be partitioned by year, month, day by event
>>> time. Similarly to the functionality of the BucketingSink.
>>>
>>> I was able to achieve the reading/writing to/from Parquet by using the
>>> hadoop-compatibility features.
>>> I couldn't find a way to partition the data by year, month, day to
>>> create a folder hierarchy accordingly. Everything is written to a single
>>> directory.
>>>
>>> I could find an unanswered question about this issue:
>>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>>
>>> Can anyone suggest a way to achieve this? Maybe there's a way to
>>> integrate the BucketingSink with the DataSet API? Another solution?
>>>
>>> Rafi
>>>
>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: BucketingSink capabilities for DataSet API

Posted by aj <aj...@gmail.com>.
Thanks, Timo. I have not used and explore Table API until now. I have used
dataset and datastream API only.
I will read about the Table API.

On Wed, Feb 19, 2020 at 4:33 PM Timo Walther <tw...@apache.org> wrote:

> Hi Anuj,
>
> another option would be to use the new Hive connectors. Have you looked
> into those? They might work on SQL internal data types which is why you
> would need to use the Table API then.
>
> Maybe Bowen in CC can help you here.
>
> Regards,
> Timo
>
> On 19.02.20 11:14, Rafi Aroch wrote:
> > Hi Anuj,
> >
> > It's been a while since I wrote this (Flink 1.5.2). Could be a
> > better/newer way, but this is what how I read & write Parquet with
> > hadoop-compatibility:
> >
> >     // imports
> >     import org.apache.avro.generic.GenericRecord;
> >     import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> >
> >     import
> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
> >
> >     import org.apache.flink.hadoopcompatibility.HadoopInputs;
> >     import org.apache.hadoop.conf.Configuration;
> >     import org.apache.hadoop.fs.Path;
> >     import org.apache.hadoop.mapreduce.Job;
> >     import org.apache.parquet.avro.AvroParquetInputFormat;
> >
> >     // Creating Parquet input format
> >     Configuration conf = new Configuration();
> >     Job job = Job.getInstance(conf);
> >     AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
> >     AvroParquetInputFormat<>();
> >     AvroParquetInputFormat.setInputDirRecursive(job, true);
> >     AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> >     HadoopInputFormat<Void, GenericRecord> inputFormat
> >     = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> >     GenericRecord.class, job);
> >
> >     // Creating Parquet output format
> >     AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
> >     AvroParquetOutputFormat<>();
> >     AvroParquetOutputFormat.setSchema(job, new
> >     Schema.Parser().parse(SomeEvent.SCHEMA));
> >     AvroParquetOutputFormat.setCompression(job,
> >     CompressionCodecName.SNAPPY);
> >     AvroParquetOutputFormat.setCompressOutput(job, true);
> >     AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> >     HadoopOutputFormat<Void, GenericRecord> outputFormat = new
> >     HadoopOutputFormat<>(parquetOutputFormat, job);
> >
> >     DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
> >     env.createInput(inputFormat);
> >
> >     // Start processing...
> >
> >     // Writing result as Parquet
> >     resultDataSet.output(outputFormat);
> >
> >
> > Regarding writing partitioned data, as far as I know, there is no way to
> > achieve that with the DataSet API with hadoop-compatibility.
> >
> > You could implement this with reading from input files as stream and
> > then using StreamingFileSink with a custom BucketAssigner [1].
> > The problem with that (which was not yet resolved AFAIK) is described
> > here [2] in "Important Notice 2".
> >
> > Sadly I say, that eventually, for this use-case I chose Spark to do the
> > job...
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
> >
> > Hope this helps.
> >
> > Rafi
> >
> >
> > On Sat, Feb 15, 2020 at 5:03 PM aj <ajainjecrc@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Hi Rafi,
> >
> >     I have a similar use case where I want to read parquet files in the
> >     dataset and want to perform some transformation and similarly want
> >     to write the result using year month day partitioned.
> >
> >     I am stuck at first step only where how to read and write
> >     Parquet files using hadoop-Compatability.
> >
> >     Please help me with this and also if u find the solution for how to
> >     write data in partitioned.
> >
> >     Thanks,
> >     Anuj
> >
> >     On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
> >     <andrey@data-artisans.com <ma...@data-artisans.com>> wrote:
> >
> >         Hi Rafi,
> >
> >         At the moment I do not see any support of Parquet in DataSet API
> >         except HadoopOutputFormat, mentioned in stack overflow question.
> >         I have cc’ed Fabian and Aljoscha, maybe they could provide more
> >         information.
> >
> >         Best,
> >         Andrey
> >
> >>         On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.aroch@gmail.com
> >>         <ma...@gmail.com>> wrote:
> >>
> >>         Hi,
> >>
> >>         I'm writing a Batch job which reads Parquet, does some
> >>         aggregations and writes back as Parquet files.
> >>         I would like the output to be partitioned by year, month, day
> >>         by event time. Similarly to the functionality of the
> >>         BucketingSink.
> >>
> >>         I was able to achieve the reading/writing to/from Parquet by
> >>         using the hadoop-compatibility features.
> >>         I couldn't find a way to partition the data by year, month,
> >>         day to create a folder hierarchy accordingly. Everything is
> >>         written to a single directory.
> >>
> >>         I could find an unanswered question about this issue:
> >>
> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
> >>
> >>         Can anyone suggest a way to achieve this? Maybe there's a way
> >>         to integrate the BucketingSink with the DataSet API? Another
> >>         solution?
> >>
> >>         Rafi
> >
> >
> >
> >     --
> >     Thanks & Regards,
> >     Anuj Jain
> >     Mob. : +91- 8588817877
> >     Skype : anuj.jain07
> >     ****<http://www.oracle.com/>
> >
> >
> >     <http://www.cse.iitm.ac.in/%7Eanujjain/>
> >
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: BucketingSink capabilities for DataSet API

Posted by Timo Walther <tw...@apache.org>.
Hi Anuj,

another option would be to use the new Hive connectors. Have you looked 
into those? They might work on SQL internal data types which is why you 
would need to use the Table API then.

Maybe Bowen in CC can help you here.

Regards,
Timo

On 19.02.20 11:14, Rafi Aroch wrote:
> Hi Anuj,
> 
> It's been a while since I wrote this (Flink 1.5.2). Could be a 
> better/newer way, but this is what how I read & write Parquet with 
> hadoop-compatibility:
> 
>     // imports
>     import org.apache.avro.generic.GenericRecord;
>     import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> 
>     import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; 
> 
>     import org.apache.flink.hadoopcompatibility.HadoopInputs;
>     import org.apache.hadoop.conf.Configuration;
>     import org.apache.hadoop.fs.Path;
>     import org.apache.hadoop.mapreduce.Job;
>     import org.apache.parquet.avro.AvroParquetInputFormat;
> 
>     // Creating Parquet input format
>     Configuration conf = new Configuration();
>     Job job = Job.getInstance(conf);
>     AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
>     AvroParquetInputFormat<>();
>     AvroParquetInputFormat.setInputDirRecursive(job, true);
>     AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
>     HadoopInputFormat<Void, GenericRecord> inputFormat
>     = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
>     GenericRecord.class, job);
> 
>     // Creating Parquet output format
>     AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
>     AvroParquetOutputFormat<>();
>     AvroParquetOutputFormat.setSchema(job, new
>     Schema.Parser().parse(SomeEvent.SCHEMA));
>     AvroParquetOutputFormat.setCompression(job,
>     CompressionCodecName.SNAPPY);
>     AvroParquetOutputFormat.setCompressOutput(job, true);
>     AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
>     HadoopOutputFormat<Void, GenericRecord> outputFormat = new
>     HadoopOutputFormat<>(parquetOutputFormat, job); 
> 
>     DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
>     env.createInput(inputFormat); 
> 
>     // Start processing... 
> 
>     // Writing result as Parquet
>     resultDataSet.output(outputFormat);
> 
> 
> Regarding writing partitioned data, as far as I know, there is no way to 
> achieve that with the DataSet API with hadoop-compatibility.
> 
> You could implement this with reading from input files as stream and 
> then using StreamingFileSink with a custom BucketAssigner [1].
> The problem with that (which was not yet resolved AFAIK) is described 
> here [2] in "Important Notice 2".
> 
> Sadly I say, that eventually, for this use-case I chose Spark to do the 
> job...
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> [2]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
> 
> Hope this helps.
> 
> Rafi
> 
> 
> On Sat, Feb 15, 2020 at 5:03 PM aj <ajainjecrc@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Hi Rafi,
> 
>     I have a similar use case where I want to read parquet files in the
>     dataset and want to perform some transformation and similarly want
>     to write the result using year month day partitioned.
> 
>     I am stuck at first step only where how to read and write
>     Parquet files using hadoop-Compatability.
> 
>     Please help me with this and also if u find the solution for how to
>     write data in partitioned.
> 
>     Thanks,
>     Anuj
> 
>     On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
>     <andrey@data-artisans.com <ma...@data-artisans.com>> wrote:
> 
>         Hi Rafi,
> 
>         At the moment I do not see any support of Parquet in DataSet API
>         except HadoopOutputFormat, mentioned in stack overflow question.
>         I have cc’ed Fabian and Aljoscha, maybe they could provide more
>         information.
> 
>         Best,
>         Andrey
> 
>>         On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.aroch@gmail.com
>>         <ma...@gmail.com>> wrote:
>>
>>         Hi,
>>
>>         I'm writing a Batch job which reads Parquet, does some
>>         aggregations and writes back as Parquet files.
>>         I would like the output to be partitioned by year, month, day
>>         by event time. Similarly to the functionality of the
>>         BucketingSink.
>>
>>         I was able to achieve the reading/writing to/from Parquet by
>>         using the hadoop-compatibility features.
>>         I couldn't find a way to partition the data by year, month,
>>         day to create a folder hierarchy accordingly. Everything is
>>         written to a single directory.
>>
>>         I could find an unanswered question about this issue:
>>         https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>
>>         Can anyone suggest a way to achieve this? Maybe there's a way
>>         to integrate the BucketingSink with the DataSet API? Another
>>         solution?
>>
>>         Rafi
> 
> 
> 
>     -- 
>     Thanks & Regards,
>     Anuj Jain
>     Mob. : +91- 8588817877
>     Skype : anuj.jain07
>     ****<http://www.oracle.com/>
> 
> 
>     <http://www.cse.iitm.ac.in/%7Eanujjain/>
> 


Re: BucketingSink capabilities for DataSet API

Posted by Rafi Aroch <ra...@gmail.com>.
Hi Anuj,

It's been a while since I wrote this (Flink 1.5.2). Could be a better/newer
way, but this is what how I read & write Parquet with hadoop-compatibility:

// imports
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

import org.apache.flink.hadoopcompatibility.HadoopInputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.parquet.avro.AvroParquetInputFormat;
>
> // Creating Parquet input format
> Configuration conf = new Configuration();
> Job job = Job.getInstance(conf);
> AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
> AvroParquetInputFormat<>();
> AvroParquetInputFormat.setInputDirRecursive(job, true);
> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> HadoopInputFormat<Void, GenericRecord> inputFormat
> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> GenericRecord.class, job);
>


> // Creating Parquet output format
> AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
> AvroParquetOutputFormat<>();
> AvroParquetOutputFormat.setSchema(job, new
> Schema.Parser().parse(SomeEvent.SCHEMA));
> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
> AvroParquetOutputFormat.setCompressOutput(job, true);
> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> HadoopOutputFormat<Void, GenericRecord> outputFormat = new
> HadoopOutputFormat<>(parquetOutputFormat, job);



DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
> env.createInput(inputFormat);



// Start processing...



// Writing result as Parquet
> resultDataSet.output(outputFormat);


Regarding writing partitioned data, as far as I know, there is no way to
achieve that with the DataSet API with hadoop-compatibility.

You could implement this with reading from input files as stream and then
using StreamingFileSink with a custom BucketAssigner [1].
The problem with that (which was not yet resolved AFAIK) is described here
[2] in "Important Notice 2".

Sadly I say, that eventually, for this use-case I chose Spark to do the
job...

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general

Hope this helps.

Rafi


On Sat, Feb 15, 2020 at 5:03 PM aj <aj...@gmail.com> wrote:

> Hi Rafi,
>
> I have a similar use case where I want to read parquet files in the
> dataset and want to perform some transformation and similarly want to write
> the result using year month day partitioned.
>
> I am stuck at first step only where how to read and write Parquet files
> using hadoop-Compatability.
>
> Please help me with this and also if u find the solution for how to write
> data in partitioned.
>
> Thanks,
> Anuj
>
>
> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <an...@data-artisans.com>
> wrote:
>
>> Hi Rafi,
>>
>> At the moment I do not see any support of Parquet in DataSet API
>> except HadoopOutputFormat, mentioned in stack overflow question. I have
>> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>>
>> Best,
>> Andrey
>>
>> On 25 Oct 2018, at 13:08, Rafi Aroch <ra...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'm writing a Batch job which reads Parquet, does some aggregations and
>> writes back as Parquet files.
>> I would like the output to be partitioned by year, month, day by event
>> time. Similarly to the functionality of the BucketingSink.
>>
>> I was able to achieve the reading/writing to/from Parquet by using the
>> hadoop-compatibility features.
>> I couldn't find a way to partition the data by year, month, day to create
>> a folder hierarchy accordingly. Everything is written to a single directory.
>>
>> I could find an unanswered question about this issue:
>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>
>> Can anyone suggest a way to achieve this? Maybe there's a way to
>> integrate the BucketingSink with the DataSet API? Another solution?
>>
>> Rafi
>>
>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: BucketingSink capabilities for DataSet API

Posted by aj <aj...@gmail.com>.
Hi Rafi,

I have a similar use case where I want to read parquet files in the dataset
and want to perform some transformation and similarly want to write the
result using year month day partitioned.

I am stuck at first step only where how to read and write Parquet files
using hadoop-Compatability.

Please help me with this and also if u find the solution for how to write
data in partitioned.

Thanks,
Anuj


On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <an...@data-artisans.com>
wrote:

> Hi Rafi,
>
> At the moment I do not see any support of Parquet in DataSet API
> except HadoopOutputFormat, mentioned in stack overflow question. I have
> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>
> Best,
> Andrey
>
> On 25 Oct 2018, at 13:08, Rafi Aroch <ra...@gmail.com> wrote:
>
> Hi,
>
> I'm writing a Batch job which reads Parquet, does some aggregations and
> writes back as Parquet files.
> I would like the output to be partitioned by year, month, day by event
> time. Similarly to the functionality of the BucketingSink.
>
> I was able to achieve the reading/writing to/from Parquet by using the
> hadoop-compatibility features.
> I couldn't find a way to partition the data by year, month, day to create
> a folder hierarchy accordingly. Everything is written to a single directory.
>
> I could find an unanswered question about this issue:
> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>
> Can anyone suggest a way to achieve this? Maybe there's a way to integrate
> the BucketingSink with the DataSet API? Another solution?
>
> Rafi
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: BucketingSink capabilities for DataSet API

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi Rafi,

At the moment I do not see any support of Parquet in DataSet API except HadoopOutputFormat, mentioned in stack overflow question. I have cc’ed Fabian and Aljoscha, maybe they could provide more information.

Best,
Andrey

> On 25 Oct 2018, at 13:08, Rafi Aroch <ra...@gmail.com> wrote:
> 
> Hi,
> 
> I'm writing a Batch job which reads Parquet, does some aggregations and writes back as Parquet files.
> I would like the output to be partitioned by year, month, day by event time. Similarly to the functionality of the BucketingSink.
> 
> I was able to achieve the reading/writing to/from Parquet by using the hadoop-compatibility features.
> I couldn't find a way to partition the data by year, month, day to create a folder hierarchy accordingly. Everything is written to a single directory.
> 
> I could find an unanswered question about this issue: https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit <https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit>
> 
> Can anyone suggest a way to achieve this? Maybe there's a way to integrate the BucketingSink with the DataSet API? Another solution?
> 
> Rafi