You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Naehee Kim <ty...@gmail.com> on 2020/10/29 23:38:42 UTC

Question about processing a 3-level List data type in parquet

Hi Flink Dev Community,

I've found RowConverter.java in flink-parquet module doesn't support
reading a 3-level list type in parquet though it is able to process a
2-level list type.

3-level

optional group my_list (LIST) {
  repeated group element {
    required binary str (UTF8);
  };
}


  2-level

optional group my_list (LIST) {
  repeated int32 element;
}

Reference:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists

The parquet file I am testing with was written by Spark job and it has a
3-level list type. When I try to process the parquet file, it runs into
'java.lang.ClassCastException: Expected instance of group converter but got
"org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
error.

I've tested with Flink 1.9 and checked RowConverter.java still remains the
same in v1.11. To process a 3-level list, I think RowConverter.java should
be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level
list is able to be processed with BasicArrayTypeInfo.). I wonder if my
understanding is correct and if you have any plan to support a 3-level List
datatype in parquet.

For your reference, here are code snippet along with stack trace.

MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
RowTypeInfo rowTypeInfo = (RowTypeInfo)
ParquetSchemaConverter.fromParquetType(readSchema);
ParquetRowInputFormat parquetInputFormat = new
ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"),
readSchema);
DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat,
rowTypeInfo);

-- stack trace

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException:
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
	at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
	at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
	at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
	at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
	at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception when processing split: null
	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
Caused by: java.lang.ClassCastException: Expected instance of group
converter but got
"org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
	at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
	at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)

Thanks,

Naehee

Re: Question about processing a 3-level List data type in parquet

Posted by Peter Huang <hu...@gmail.com>.
Hi Naehee,

Thanks for reporting the issue. Yes, it is a bug in the ParquetInputFormat.
Would you please create a jira ticket and assign to me. I will try to fix
it by the end of this weekend.
My Jira account name Zhenqiu Huang. Thanks


Best Regards
Peter Huang


On Wed, Nov 4, 2020 at 11:57 PM Naehee Kim <ty...@gmail.com> wrote:

> Hi Jingsong,
>
> Thanks for the feedback. Can you let me know the concept and timeline of
> BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?
>
> Our use case is for backfill to process parquet files in case of any data
> issue is found in the normal processing of kafka input. Thus, we want to
> make a job to easily switch kafka input and parquet file input and vice
> versa. Wonder if ParquetBulkFormat can fit in our use case.
>
> Best,
> Naehee
>
> On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li <ji...@gmail.com>
> wrote:
>
>> Hi Naehee, sorry for the late reply.
>>
>> I think you are right, there are bugs here. We didn't think about nested
>> structures very well before.
>>
>> Now we mainly focus on the new BulkFormat implementation, which we need
>> to consider when implementing the new ParquetBulkFormat.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <ty...@gmail.com> wrote:
>>
>>> Hi Jingsong,
>>>
>>> I am forwarding the email below to you, thinking you will have a good
>>> idea about my questions below. I'd appreciate it if you give your thoughts.
>>>
>>> Thanks,
>>> Naehee
>>>
>>>
>>> ---------- Forwarded message ---------
>>> From: Naehee Kim <ty...@gmail.com>
>>> Date: Thu, Oct 29, 2020 at 4:38 PM
>>> Subject: Question about processing a 3-level List data type in parquet
>>> To: <de...@flink.apache.org>
>>>
>>>
>>> Hi Flink Dev Community,
>>>
>>> I've found RowConverter.java in flink-parquet module doesn't support
>>> reading a 3-level list type in parquet though it is able to process a
>>> 2-level list type.
>>>
>>> 3-level
>>>
>>> optional group my_list (LIST) {
>>>   repeated group element {
>>>     required binary str (UTF8);
>>>   };
>>> }
>>>
>>>
>>>   2-level
>>>
>>> optional group my_list (LIST) {
>>>   repeated int32 element;
>>> }
>>>
>>> Reference:
>>> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
>>>
>>> The parquet file I am testing with was written by Spark job and it has a
>>> 3-level list type. When I try to process the parquet file, it runs into
>>> 'java.lang.ClassCastException: Expected instance of group converter but got
>>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
>>> error.
>>>
>>> I've tested with Flink 1.9 and checked RowConverter.java still remains
>>> the same in v1.11. To process a 3-level list, I think RowConverter.java
>>> should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A
>>> 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if
>>> my understanding is correct and if you have any plan to support a 3-level
>>> List datatype in parquet.
>>>
>>> For your reference, here are code snippet along with stack trace.
>>>
>>> MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
>>> RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema);
>>> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema);
>>> DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo);
>>>
>>> -- stack trace
>>>
>>> Job execution failed.
>>> org.apache.flink.runtime.client.JobExecutionException:
>>> 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> 	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
>>> 	at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
>>> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
>>> 	at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>>> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>> 	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>>> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>> 	at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
>>> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>>> 	at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
>>> 	at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
>>> 	at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>> 	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
>>> Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
>>> 	at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
>>> 	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
>>> 	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>>> 	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>>> 	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>>> 	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>>> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
>>> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
>>> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
>>> 	at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
>>> 	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
>>>
>>> Thanks,
>>>
>>> Naehee
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Re: Question about processing a 3-level List data type in parquet

Posted by Naehee Kim <ty...@gmail.com>.
Hi Jingsong,

Thanks for the feedback. Can you let me know the concept and timeline of
BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?

Our use case is for backfill to process parquet files in case of any data
issue is found in the normal processing of kafka input. Thus, we want to
make a job to easily switch kafka input and parquet file input and vice
versa. Wonder if ParquetBulkFormat can fit in our use case.

Best,
Naehee

On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li <ji...@gmail.com> wrote:

> Hi Naehee, sorry for the late reply.
>
> I think you are right, there are bugs here. We didn't think about nested
> structures very well before.
>
> Now we mainly focus on the new BulkFormat implementation, which we need to
> consider when implementing the new ParquetBulkFormat.
>
> Best,
> Jingsong
>
> On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <ty...@gmail.com> wrote:
>
>> Hi Jingsong,
>>
>> I am forwarding the email below to you, thinking you will have a good
>> idea about my questions below. I'd appreciate it if you give your thoughts.
>>
>> Thanks,
>> Naehee
>>
>>
>> ---------- Forwarded message ---------
>> From: Naehee Kim <ty...@gmail.com>
>> Date: Thu, Oct 29, 2020 at 4:38 PM
>> Subject: Question about processing a 3-level List data type in parquet
>> To: <de...@flink.apache.org>
>>
>>
>> Hi Flink Dev Community,
>>
>> I've found RowConverter.java in flink-parquet module doesn't support
>> reading a 3-level list type in parquet though it is able to process a
>> 2-level list type.
>>
>> 3-level
>>
>> optional group my_list (LIST) {
>>   repeated group element {
>>     required binary str (UTF8);
>>   };
>> }
>>
>>
>>   2-level
>>
>> optional group my_list (LIST) {
>>   repeated int32 element;
>> }
>>
>> Reference:
>> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
>>
>> The parquet file I am testing with was written by Spark job and it has a
>> 3-level list type. When I try to process the parquet file, it runs into
>> 'java.lang.ClassCastException: Expected instance of group converter but got
>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
>> error.
>>
>> I've tested with Flink 1.9 and checked RowConverter.java still remains
>> the same in v1.11. To process a 3-level list, I think RowConverter.java
>> should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A
>> 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if
>> my understanding is correct and if you have any plan to support a 3-level
>> List datatype in parquet.
>>
>> For your reference, here are code snippet along with stack trace.
>>
>> MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
>> RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema);
>> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema);
>> DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo);
>>
>> -- stack trace
>>
>> Job execution failed.
>> org.apache.flink.runtime.client.JobExecutionException:
>> 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> 	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
>> 	at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
>> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
>> 	at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> 	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> 	at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
>> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>> 	at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
>> 	at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
>> 	at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
>> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>> 	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
>> Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
>> 	at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
>> 	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
>> 	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>> 	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>> 	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>> 	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
>> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
>> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
>> 	at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
>> 	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
>>
>> Thanks,
>>
>> Naehee
>>
>>
>
> --
> Best, Jingsong Lee
>

Re: Question about processing a 3-level List data type in parquet

Posted by Jingsong Li <ji...@gmail.com>.
Hi Naehee, sorry for the late reply.

I think you are right, there are bugs here. We didn't think about nested
structures very well before.

Now we mainly focus on the new BulkFormat implementation, which we need to
consider when implementing the new ParquetBulkFormat.

Best,
Jingsong

On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <ty...@gmail.com> wrote:

> Hi Jingsong,
>
> I am forwarding the email below to you, thinking you will have a good idea
> about my questions below. I'd appreciate it if you give your thoughts.
>
> Thanks,
> Naehee
>
>
> ---------- Forwarded message ---------
> From: Naehee Kim <ty...@gmail.com>
> Date: Thu, Oct 29, 2020 at 4:38 PM
> Subject: Question about processing a 3-level List data type in parquet
> To: <de...@flink.apache.org>
>
>
> Hi Flink Dev Community,
>
> I've found RowConverter.java in flink-parquet module doesn't support
> reading a 3-level list type in parquet though it is able to process a
> 2-level list type.
>
> 3-level
>
> optional group my_list (LIST) {
>   repeated group element {
>     required binary str (UTF8);
>   };
> }
>
>
>   2-level
>
> optional group my_list (LIST) {
>   repeated int32 element;
> }
>
> Reference:
> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
>
> The parquet file I am testing with was written by Spark job and it has a
> 3-level list type. When I try to process the parquet file, it runs into
> 'java.lang.ClassCastException: Expected instance of group converter but got
> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
> error.
>
> I've tested with Flink 1.9 and checked RowConverter.java still remains the
> same in v1.11. To process a 3-level list, I think RowConverter.java should
> be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level
> list is able to be processed with BasicArrayTypeInfo.). I wonder if my
> understanding is correct and if you have any plan to support a 3-level List
> datatype in parquet.
>
> For your reference, here are code snippet along with stack trace.
>
> MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
> RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema);
> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema);
> DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo);
>
> -- stack trace
>
> Job execution failed.
> org.apache.flink.runtime.client.JobExecutionException:
> 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> 	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
> 	at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
> 	at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> 	at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
> 	at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
> 	at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
> 	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
> Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
> 	at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
> 	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
> 	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
> 	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
> 	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
> 	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
> 	at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
> 	at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
> 	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
>
> Thanks,
>
> Naehee
>
>

-- 
Best, Jingsong Lee