You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kyle Hamlin <ha...@gmail.com> on 2018/01/05 21:11:41 UTC

Dynamically get schema from element to pass to AvroParquetWriter

I implemented an Avro to Parquet writer which previously took an Avro
schema in as a string to the constructor and passed it to the
AvroParquetWriter. Now I'm wondering if there is a way to get the schema
from the element and pass to the AvroParquetWriter. I tried grabbing the
schema from the element in the write method but it is called later than
open so that doesn't seem to work. I need to do this because I'm sinking
several Kafka topics in one app to s3 so different messages need different
schema passed to the writer.

class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] {

@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _

override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}

override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]()

override def close(): Unit = writer.close()

override def getPos: Long = writer.getDataSize

override def flush(): Long = writer.getDataSize

override def open(fs: FileSystem, path: Path): Unit = {

writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}

}

Re: Dynamically get schema from element to pass to AvroParquetWriter

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Kyle,

I'm not sure I understand the problem. I assume you have one sink for each
Avro type (Kafka topic).
If you have multiple sinks, why is it not possible to configure each one
with the correct Avro schema?

Best, Fabian

2018-01-05 22:11 GMT+01:00 Kyle Hamlin <ha...@gmail.com>:

> I implemented an Avro to Parquet writer which previously took an Avro
> schema in as a string to the constructor and passed it to the
> AvroParquetWriter. Now I'm wondering if there is a way to get the schema
> from the element and pass to the AvroParquetWriter. I tried grabbing the
> schema from the element in the write method but it is called later than
> open so that doesn't seem to work. I need to do this because I'm sinking
> several Kafka topics in one app to s3 so different messages need different
> schema passed to the writer.
>
> class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] {
>
> @transient private var writer: ParquetWriter[T] = _
> @transient private var schema: Schema = _
>
> override def write(element: T): Unit = {
> schema = element.getSchema
> writer.write(element)
> }
>
> override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]()
>
> override def close(): Unit = writer.close()
>
> override def getPos: Long = writer.getDataSize
>
> override def flush(): Long = writer.getDataSize
>
> override def open(fs: FileSystem, path: Path): Unit = {
>
> writer = AvroParquetWriter.builder[T](path)
> .withSchema(schema)
> .withCompressionCodec(CompressionCodecName.SNAPPY)
> .build()
> }
>
> }
>