You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tim Josefsson <ti...@webstep.se> on 2023/02/24 13:17:30 UTC

Choosing compression codec when using parquet file sink

I'm writing a Flink processor that will read a bunch of JSON records from
Kafka and then write them to S3 in parquet format using the FileSink. I've
got most things in place, the only thing I haven't been able to figure out
is how to change the compression codec used by the writer. Is there any
recommended way to do this? Currently I'm using the AvroParquetWriters.
forReflectRecord(PlayerEvent.class) to transform my POJOs to Avro and then
write them as Parquet files. I've looked into the AvroParquetWriters class
but couldn't figure out how to configure the compression codec (or even
what codec was used). Is there a way to configure this or do I have to
write my own implementation of the Parquet writer and if so, how would one
do that?

Thankful for any help,
Tim

Re: Choosing compression codec when using parquet file sink

Posted by Tiansu Yu <ti...@icloud.com>.
Hi, Tim. 

If you look at the doc here https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1 <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1>, you just need to write a custom `AvroWriterFactory` method where you could pass params such as Codecs to your AvroWriter. Despite the name suggests, it applies to the BulkFormats, which includes Parquet files as well. I copy the examples below:

```
AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> {
	Schema schema = ReflectData.get().getSchema(Address.class);
	DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);

	DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter);
	dataFileWriter.setCodec(CodecFactory.snappyCodec());
	dataFileWriter.create(schema, out);
	return dataFileWriter;
});

DataStream<Address> stream = ...
stream.sinkTo(FileSink.forBulkFormat(
	outputBasePath,
	factory).build());
```
Best,
Tiansu 
 

> On 24. 02 2023, at 14:17, Tim Josefsson <ti...@webstep.se> wrote:
> 
> I'm writing a Flink processor that will read a bunch of JSON records from Kafka and then write them to S3 in parquet format using the FileSink. I've got most things in place, the only thing I haven't been able to figure out is how to change the compression codec used by the writer. Is there any recommended way to do this? Currently I'm using the AvroParquetWriters.forReflectRecord(PlayerEvent.class) to transform my POJOs to Avro and then write them as Parquet files. I've looked into the AvroParquetWriters class but couldn't figure out how to configure the compression codec (or even what codec was used). Is there a way to configure this or do I have to write my own implementation of the Parquet writer and if so, how would one do that?
> 
> Thankful for any help,
> Tim