You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fatima Omer <fa...@dlvr.com> on 2020/02/11 22:20:00 UTC

Flink complaining when trying to write to s3 in Parquet format

I have a java app that is using a flink SQL query to perform aggregations
on a data stream being read in from Kafka. Attached is the java file for
reference.

The query results are being written to s3. I can write successfully in Json
format but when I try to use Parquet format, flink complains that min_ts is
an optional group. I have verified that min_ts can never be null in our
scheme of things.

Would appreciate help on this. Thanks!

Stack trace:

Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot
write a schema with an empty group: optional group min_ts {

}

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)

    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)

    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)

    at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)

    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)

    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)

    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)

    at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)

    at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)

    ... 50 more

Re: Flink complaining when trying to write to s3 in Parquet format

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Fatima,

I am not super familiar with Parquet but your issue seems to be
related to [1], which seems to be expected behaviour on the Parquet
side.
The reason for this behaviour seems to be the format of the parquet
files which store only the leaf fields but not the structure of the
groups, so if a group has no fields, its schema cannot be inferred.
Given this, I do not think that it is a bug but feel free to check
further and let us know if I am wrong.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/PARQUET-278

On Tue, Feb 11, 2020 at 11:20 PM Fatima Omer <fa...@dlvr.com> wrote:
>
> I have a java app that is using a flink SQL query to perform aggregations on a data stream being read in from Kafka. Attached is the java file for reference.
>
> The query results are being written to s3. I can write successfully in Json format but when I try to use Parquet format, flink complains that min_ts is an optional group. I have verified that min_ts can never be null in our scheme of things.
>
> Would appreciate help on this. Thanks!
>
> Stack trace:
>
> Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group min_ts {
>
> }
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
>
>     at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
>
>     at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
>
>     at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
>
>     at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)
>
>     at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>
>     at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)
>
>     at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)
>
>     at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)
>
>     at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
>
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
>     ... 50 more
>
>
>