You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Meghajit Mazumdar <me...@gojek.com> on 2022/01/07 05:10:34 UTC

RowType for complex types in Parquet File

Hello,

Flink documentation mentions this
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>
as to how to create a FileSource for reading Parquet files.
For primitive parquet types like BINARY and BOOLEAN, I am able to create a
RowType and read the fields.

However, I have some nested fields in my parquet schema also like this
which I want to read :

  optional group location = 11 {
    optional double latitude = 1;
    optional double longitude = 2;
  }

How can I create a RowType for this ? I did something like this below, but
I got an exception `Caused by: java.lang.UnsupportedOperationException:
Complex types not supported`

            RowType nestedRowType = RowType.of(new LogicalType[] {new
DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
            final LogicalType[] fieldTypes = new
LogicalType[]{nestedRowType};
            final ParquetColumnarRowInputFormat<FileSourceSplit> format =
                    new ParquetColumnarRowInputFormat<>(
                            new Configuration(),
                            RowType.of(fieldTypes, new
String[]{"location"}),
                            500,
                            false,
                            true);

Re: RowType for complex types in Parquet File

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Hi,
Isn't this actually already implemented and planed for version 1.15?
https://issues.apache.org/jira/browse/FLINK-17782

Regards,
Krzysztof Chmielewski

pt., 7 sty 2022 o 16:20 Jing Ge <ji...@ververica.com> napisaƂ(a):

> Hi Meghajit,
>
> like the exception described, parquet schema with nested columns is not
> supported currently. It is on our todo list with high priority.
>
> Best regards
> Jing
>
> On Fri, Jan 7, 2022 at 6:12 AM Meghajit Mazumdar <
> meghajit.mazumdar@gojek.com> wrote:
>
>> Hello,
>>
>> Flink documentation mentions this
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>
>> as to how to create a FileSource for reading Parquet files.
>> For primitive parquet types like BINARY and BOOLEAN, I am able to create
>> a RowType and read the fields.
>>
>> However, I have some nested fields in my parquet schema also like this
>> which I want to read :
>>
>>   optional group location = 11 {
>>     optional double latitude = 1;
>>     optional double longitude = 2;
>>   }
>>
>> How can I create a RowType for this ? I did something like this below,
>> but I got an exception `Caused by:
>> java.lang.UnsupportedOperationException: Complex types not supported`
>>
>>             RowType nestedRowType = RowType.of(new LogicalType[] {new
>> DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
>>             final LogicalType[] fieldTypes = new
>> LogicalType[]{nestedRowType};
>>             final ParquetColumnarRowInputFormat<FileSourceSplit> format =
>>                     new ParquetColumnarRowInputFormat<>(
>>                             new Configuration(),
>>                             RowType.of(fieldTypes, new
>> String[]{"location"}),
>>                             500,
>>                             false,
>>                             true);
>>
>

Re: RowType for complex types in Parquet File

Posted by Jing Ge <ji...@ververica.com>.
Hi Meghajit,

like the exception described, parquet schema with nested columns is not
supported currently. It is on our todo list with high priority.

Best regards
Jing

On Fri, Jan 7, 2022 at 6:12 AM Meghajit Mazumdar <
meghajit.mazumdar@gojek.com> wrote:

> Hello,
>
> Flink documentation mentions this
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>
> as to how to create a FileSource for reading Parquet files.
> For primitive parquet types like BINARY and BOOLEAN, I am able to create a
> RowType and read the fields.
>
> However, I have some nested fields in my parquet schema also like this
> which I want to read :
>
>   optional group location = 11 {
>     optional double latitude = 1;
>     optional double longitude = 2;
>   }
>
> How can I create a RowType for this ? I did something like this below, but
> I got an exception `Caused by: java.lang.UnsupportedOperationException:
> Complex types not supported`
>
>             RowType nestedRowType = RowType.of(new LogicalType[] {new
> DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
>             final LogicalType[] fieldTypes = new
> LogicalType[]{nestedRowType};
>             final ParquetColumnarRowInputFormat<FileSourceSplit> format =
>                     new ParquetColumnarRowInputFormat<>(
>                             new Configuration(),
>                             RowType.of(fieldTypes, new
> String[]{"location"}),
>                             500,
>                             false,
>                             true);
>