You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Michael (Jira)" <ji...@apache.org> on 2022/11/16 02:50:00 UTC
[jira] [Comment Edited] (FLINK-28867) Parquet reader support nested type in array/map type
[ https://issues.apache.org/jira/browse/FLINK-28867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634609#comment-17634609 ]
Michael edited comment on FLINK-28867 at 11/16/22 2:49 AM:
-----------------------------------------------------------
I’m using Flink 1.16.0, and I would like to read Parquet file (attached), that has schema [1].
I could read this file with Spark, but when I try to read it with Flink 1.16.0 (program attached) using schema [2]
I got IndexOutOfBoundsException [3]
My code, and parquet file are attached.
[1]: Parquet Schema
{code}
root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
| |-- extendedProperties: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- key: string (nullable = true)
| | | |-- value: string (nullable = true)
| |-- sourceAccountId: string (nullable = true)
| |-- sourceEntityId: string (nullable = true)
| |-- sourceEntityType: string (nullable = true)
| |-- sourceSystem: string (nullable = true)
{code}
[2]: Schema used in Flink:
{code}
static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[]
{ new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH) }
,
new String[]
{ "key", "value" }
);
RowType element = RowType.of(
new LogicalType[]
{ elementType }
,
new String[]
{ "element" }
);
RowType sourceEntity = RowType.of(
new LogicalType[]
{ new ArrayType(element), new VarCharType(), new VarCharType(), new VarCharType(), new VarCharType(), }
,
new String[]
{ "extendedProperties", "sourceAccountId", "sourceEntityId", "sourceEntityType", "sourceSystem" }
);
return RowType.of(
new LogicalType[]
{ new DecimalType(), new VarCharType(), sourceEntity }
,
new String[]
{ "amount", "connectionAccountId", "sourceEntity", }
);
}
{code}
[3]: Execution Exception:
2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 common frames omitted
was (Author: benenson):
I’m using Flink 1.16.0, and I would like to read Parquet file (attached), that has schema [1].
I could read this file with Spark, but when I try to read it with Flink 1.16.0 (program attached) using schema [2]
I got IndexOutOfBoundsException [3]
My code, and parquet file are attached.
[1]: Parquet Schema
root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
| |-- extendedProperties: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- key: string (nullable = true)
| | | |-- value: string (nullable = true)
| |-- sourceAccountId: string (nullable = true)
| |-- sourceEntityId: string (nullable = true)
| |-- sourceEntityType: string (nullable = true)
| |-- sourceSystem: string (nullable = true)
[2]: Schema used in Flink:
static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[]
{ new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH) }
,
new String[]
{ "key", "value" }
);
RowType element = RowType.of(
new LogicalType[]
{ elementType }
,
new String[]
{ "element" }
);
RowType sourceEntity = RowType.of(
new LogicalType[]
{ new ArrayType(element), new VarCharType(), new VarCharType(), new VarCharType(), new VarCharType(), }
,
new String[]
{ "extendedProperties", "sourceAccountId", "sourceEntityId", "sourceEntityType", "sourceSystem" }
);
return RowType.of(
new LogicalType[]
{ new DecimalType(), new VarCharType(), sourceEntity }
,
new String[]
{ "amount", "connectionAccountId", "sourceEntity", }
);
}
[3]: Execution Exception:
2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 common frames omitted
> Parquet reader support nested type in array/map type
> ----------------------------------------------------
>
> Key: FLINK-28867
> URL: https://issues.apache.org/jira/browse/FLINK-28867
> Project: Flink
> Issue Type: Sub-task
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.16.0
> Reporter: dalongliu
> Priority: Major
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)