You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vikash Dat <da...@gmail.com> on 2020/08/12 00:42:45 UTC

Flink Parquet Streaming FileSink with scala case class with optional fields error

I have defined a streaming file sink for parquet to store my scala case
class.

StreamingFileSink

  .*forBulkFormat(*

    new Path*(*appArgs.datalakeBucket*)*,

    ParquetAvroWriters

      .*forReflectRecord(classOf[*Log*])*

*  )*

  .withBucketAssigner*(*new TransactionLogHiveBucketAssigner*())*

  .build*()*


where my class class is

Log(

   level: String,

    time_stamp: Option[Long] = None

)


When Flink tries to write a specific instance to parquet


Log("info",Some(1596975950000))


it throws the following error:


org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema
with an empty group: required group time_stamp {
}
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
    at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil
.java:23)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter
.java:280)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283
)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
.java:564)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
.createAvroParquetWriter(ParquetAvroWriters.java:87)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
ParquetWriterFactory.java:57)
    at org.apache.flink.streaming.api.functions.sink.filesystem.
BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
.rollPartFile(Bucket.java:222)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
.write(Bucket.java:212)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
.onElement(Buckets.java:274)
    at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink.invoke(StreamingFileSink.java:445)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
730)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
708)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(
StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
730)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
708)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:53)
    at org.apache.flink.streaming.api.functions.windowing.
PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction
.java:46)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:373)
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:173)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)

Can Flink Parquet not handle field of type Option?

Thanks,

Re: Flink Parquet Streaming FileSink with scala case class with optional fields error

Posted by Arvid Heise <ar...@ververica.com>.
Hi Vikash,

The error is coming from Parquet itself in conjunction with Avro (which is
used to infer the schema of your scala class). The inferred schema is

{
    "fields": [
        {
            "name": "level",
            "type": "string"
        },
        {
            "name": "time_stamp",
            "type": {
                "fields": [],
                "name": "Option",
                "namespace": "scala",
                "type": "record"
            }
        }
    ],
    "name": "Log",
    "namespace": "org.apache.flink.formats.parquet.avro",
    "type": "record"
}

As you can see, Avro infers your schema, such that Option is treated as an
arbitrary class. Since it doesn't have any fields, you receive your error
from Parquet though.

I don't see an easy fix for it, but you can probably search for solutions
with Avro's ReflectData and scala.Option. As a workaround, you can refrain
from using an Option field, and go with a nullable field (you can translate
it into Option with a fancy getter).

In general, if you want to have more control over the schema, I'd suggest
to go schema first: Define your Avro schema and use avro-hugger to generate
the corresponding Scala class. In that way, Option is properly supported.

Best,

Arvid



On Wed, Aug 12, 2020 at 2:43 AM Vikash Dat <da...@gmail.com> wrote:

> I have defined a streaming file sink for parquet to store my scala case
> class.
>
> StreamingFileSink
>
>   .*forBulkFormat(*
>
>     new Path*(*appArgs.datalakeBucket*)*,
>
>     ParquetAvroWriters
>
>       .*forReflectRecord(classOf[*Log*])*
>
> *  )*
>
>   .withBucketAssigner*(*new TransactionLogHiveBucketAssigner*())*
>
>   .build*()*
>
>
> where my class class is
>
> Log(
>
>    level: String,
>
>     time_stamp: Option[Long] = None
>
> )
>
>
> When Flink tries to write a specific instance to parquet
>
>
> Log("info",Some(1596975950000))
>
>
> it throws the following error:
>
>
> org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema
> with an empty group: required group time_stamp {
> }
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
>     at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
>     at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
>     at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil
> .java:23)
>     at org.apache.parquet.hadoop.ParquetFileWriter.<init>(
> ParquetFileWriter.java:280)
>     at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:
> 283)
>     at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
> .java:564)
>     at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .createAvroParquetWriter(ParquetAvroWriters.java:87)
>     at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
>     at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
> ParquetWriterFactory.java:57)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .rollPartFile(Bucket.java:222)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:212)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:274)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.invoke(StreamingFileSink.java:445)
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(
> StreamSink.java:56)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 730)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 708)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 730)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 708)
>     at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:53)
>     at org.apache.flink.streaming.api.functions.windowing.
> PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalSingleValueWindowFunction.process(
> InternalSingleValueWindowFunction.java:46)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:373)
>     at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:173)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:151)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:128)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:69)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:311)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:187)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:487)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:470)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>     at java.lang.Thread.run(Thread.java:748)
>
> Can Flink Parquet not handle field of type Option?
>
> Thanks,
>
>
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng