You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Enrico Agnoli <en...@workday.com> on 2019/08/30 08:23:52 UTC

[DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations

StreamingFile limitations

Hi community,

I'm working toward the porting of our code from `BucketingSink<>` to `StreamingFileSink`.
In this case we use the sink to write AVRO via Parquet and the suggested implementation of the Sink should be something like:

```
val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass)
StreamingFileSink.forBulkFormat(basePath, parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner)
```

In this design the BucketAssigner is concatenated after the bulkFormat step. The problem that I'm having with this design is that I have an object that contains information that should be used to construct the path and a sub-object that contains the data to serialize. A simple example

myClass
|- country
|- cityClass extends SpecificRecordBase)

Let's say I receive myClass as a stream and I want to serialize the cityClass data via the logic above. The problem is that the `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so myClass doesn't work.
If I extract cityClass from myClass then I will not have country available in the `withBucketAssigner(..)` to be able to store the data in the right folder...


Am I missing something or I do have to write my own version of the `ParquetBulkWriter<T>` class so to be able to handle `myClass`?

Thanks for any idea and suggestion.
Enrico

Re: [DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations

Posted by Enrico Agnoli <en...@workday.com>.
Thanks for confirming.
We have a 
```
public class ParquetSinkWriter implements Writer<myClass>
```
that handles the serialization of the data.
We implemented it starting from:
https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519
https://stackoverflow.com/questions/48098011/how-to-use-apache-flink-write-parquet-file-on-hdfs-by-datetime-partition


On 2019/09/09 09:31:03, Kostas Kloudas <kk...@gmail.com> wrote: 
> Hi Enrico,
> 
> Sorry for the late reply. I think your understanding is correct.
> The best way to do it is to write your own ParquetBulkWriter and the
> corresponding factory.
> 
> Out of curiosity, I guess that in the BucketingSink you were using the
> AvroKeyValueSinkWriter, right?
> 
> Cheers,
> Kostas
> 
> On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli
> <en...@workday.com> wrote:
> >
> > StreamingFile limitations
> >
> > Hi community,
> >
> > I'm working toward the porting of our code from `BucketingSink<>` to `StreamingFileSink`.
> > In this case we use the sink to write AVRO via Parquet and the suggested implementation of the Sink should be something like:
> >
> > ```
> > val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass)
> > StreamingFileSink.forBulkFormat(basePath, parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner)
> > ```
> >
> > In this design the BucketAssigner is concatenated after the bulkFormat step. The problem that I'm having with this design is that I have an object that contains information that should be used to construct the path and a sub-object that contains the data to serialize. A simple example
> >
> > myClass
> > |- country
> > |- cityClass extends SpecificRecordBase)
> >
> > Let's say I receive myClass as a stream and I want to serialize the cityClass data via the logic above. The problem is that the `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so myClass doesn't work.
> > If I extract cityClass from myClass then I will not have country available in the `withBucketAssigner(..)` to be able to store the data in the right folder...
> >
> >
> > Am I missing something or I do have to write my own version of the `ParquetBulkWriter<T>` class so to be able to handle `myClass`?
> >
> > Thanks for any idea and suggestion.
> > Enrico
> 

Re: [DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Enrico,

Sorry for the late reply. I think your understanding is correct.
The best way to do it is to write your own ParquetBulkWriter and the
corresponding factory.

Out of curiosity, I guess that in the BucketingSink you were using the
AvroKeyValueSinkWriter, right?

Cheers,
Kostas

On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli
<en...@workday.com> wrote:
>
> StreamingFile limitations
>
> Hi community,
>
> I'm working toward the porting of our code from `BucketingSink<>` to `StreamingFileSink`.
> In this case we use the sink to write AVRO via Parquet and the suggested implementation of the Sink should be something like:
>
> ```
> val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass)
> StreamingFileSink.forBulkFormat(basePath, parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner)
> ```
>
> In this design the BucketAssigner is concatenated after the bulkFormat step. The problem that I'm having with this design is that I have an object that contains information that should be used to construct the path and a sub-object that contains the data to serialize. A simple example
>
> myClass
> |- country
> |- cityClass extends SpecificRecordBase)
>
> Let's say I receive myClass as a stream and I want to serialize the cityClass data via the logic above. The problem is that the `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so myClass doesn't work.
> If I extract cityClass from myClass then I will not have country available in the `withBucketAssigner(..)` to be able to store the data in the right folder...
>
>
> Am I missing something or I do have to write my own version of the `ParquetBulkWriter<T>` class so to be able to handle `myClass`?
>
> Thanks for any idea and suggestion.
> Enrico