You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Michael (Jira)" <ji...@apache.org> on 2022/11/16 02:50:00 UTC

[jira] [Comment Edited] (FLINK-28867) Parquet reader support nested type in array/map type

    [ https://issues.apache.org/jira/browse/FLINK-28867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634609#comment-17634609 ] 

Michael edited comment on FLINK-28867 at 11/16/22 2:49 AM:
-----------------------------------------------------------

I’m using Flink 1.16.0, and I would like to read Parquet file (attached), that has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 (program attached) using schema [2]

I got IndexOutOfBoundsException [3]

My code, and parquet file are attached.

[1]: Parquet Schema

{code}
root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
|    |-- extendedProperties: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- key: string (nullable = true)
|    |    |    |-- value: string (nullable = true)
|    |-- sourceAccountId: string (nullable = true)
|    |-- sourceEntityId: string (nullable = true)
|    |-- sourceEntityType: string (nullable = true)
|    |-- sourceSystem: string (nullable = true)
{code}

[2]: Schema used in Flink:
{code}
static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[]

{ new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH) }

,
new String[]

{ "key", "value" }

);

RowType element = RowType.of(
new LogicalType[]

{ elementType }

,
new String[]

{ "element" }

);

RowType sourceEntity = RowType.of(
new LogicalType[]

{ new ArrayType(element), new VarCharType(), new VarCharType(), new VarCharType(), new VarCharType(), }

,
new String[]

{ "extendedProperties", "sourceAccountId", "sourceEntityId", "sourceEntityType", "sourceSystem" }

);

return RowType.of(
new LogicalType[]

{ new DecimalType(), new VarCharType(), sourceEntity }

,
new String[]

{ "amount", "connectionAccountId", "sourceEntity", }

);
}
{code}
[3]: Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 common frames omitted


was (Author: benenson):
I’m using Flink 1.16.0, and I would like to read Parquet file (attached), that has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 (program attached) using schema [2]

I got IndexOutOfBoundsException [3]

My code, and parquet file are attached.

[1]: Parquet Schema

root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
|    |-- extendedProperties: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- key: string (nullable = true)
|    |    |    |-- value: string (nullable = true)
|    |-- sourceAccountId: string (nullable = true)
|    |-- sourceEntityId: string (nullable = true)
|    |-- sourceEntityType: string (nullable = true)
|    |-- sourceSystem: string (nullable = true)

[2]: Schema used in Flink:

static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[]

{ new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH) }

,
new String[]

{ "key", "value" }

);

RowType element = RowType.of(
new LogicalType[]

{ elementType }

,
new String[]

{ "element" }

);

RowType sourceEntity = RowType.of(
new LogicalType[]

{ new ArrayType(element), new VarCharType(), new VarCharType(), new VarCharType(), new VarCharType(), }

,
new String[]

{ "extendedProperties", "sourceAccountId", "sourceEntityId", "sourceEntityType", "sourceSystem" }

);

return RowType.of(
new LogicalType[]

{ new DecimalType(), new VarCharType(), sourceEntity }

,
new String[]

{ "amount", "connectionAccountId", "sourceEntity", }

);
}

[3]: Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 common frames omitted

> Parquet reader support nested type in array/map type
> ----------------------------------------------------
>
>                 Key: FLINK-28867
>                 URL: https://issues.apache.org/jira/browse/FLINK-28867
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.16.0
>            Reporter: dalongliu
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)