You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anne Lai <an...@gmail.com> on 2022/01/25 20:23:34 UTC

Huge data size difference using Table and DataStream API

Hi,

We observed a huge data size difference when loading parquet files from
filesystem using DataStream API and Table API . As you can see in the
screenshots below, the same dataset ended up to be 67GB and 33MB
respectively. This is not only causing performance issues (loading a table
will take up to minutes using DataStream API, while it usually finishes
within seconds otherwise) but also causing downstream operators more easily
to fail.

[image: flink-datastream-api-size.png]
[image: flink-table-api-size.png]

This is how we read parquet files into a table, basically following the
examples from the Flink documentation. We're using Flink 1.14.2 with BATCH
execution mode.
[image: Screen Shot 2022-01-25 at 12.02.49 PM.png]

Please advise on how we can further investigate this, or correct any errors
in the code. Appreciate your help!

Thanks,
Anne

Re: Huge data size difference using Table and DataStream API

Posted by Anne Lai <an...@gmail.com>.
Hi Alexander,

The results from both pipelines are the same. We only specify the names and
types of each field, and there’s no processing logic applied to this point.

Hi Yun,

We’re using ParquetColumnarRowInputFormat, which is the format provided for
parquet data when using FileSource. I found out in the codebase that it
provides the RowData iterator, while on the other hand, Table/SQL API uses
Row type. Do you think this is causing any unexpected impacts?

Best,
Anne

On Wed, Jan 26, 2022 at 2:24 PM Alexander Fedulov <al...@ververica.com>
wrote:

> Hi Anne,
>
> Parquet supports predicates pushdown, but only for Table/SQL API. If you
> do any significant projections/filtering in your processing logic, this is
> where the difference might come from. Are the results you get at the end of
> the both pipelines the same?
>
> Best,
> Alexander Fedulov
>
> On Wed, Jan 26, 2022 at 9:30 AM Yun Tang <my...@live.com> wrote:
>
>> Hi Anne,
>>
>> Table API would usually use different type of format data to communicate
>> within different operators compared with data stream API, you can check the
>> data types after loading parquet file in table API.
>>
>> From your description, the table API behaves much better than data stream
>> API, do you want to investigate why data stream API behaves poor?
>>
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Anne Lai <an...@gmail.com>
>> *Sent:* Wednesday, January 26, 2022 4:23
>> *To:* User <us...@flink.apache.org>
>> *Subject:* Huge data size difference using Table and DataStream API
>>
>> Hi,
>>
>> We observed a huge data size difference when loading parquet files from
>> filesystem using DataStream API and Table API . As you can see in the
>> screenshots below, the same dataset ended up to be 67GB and 33MB
>> respectively. This is not only causing performance issues (loading a table
>> will take up to minutes using DataStream API, while it usually finishes
>> within seconds otherwise) but also causing downstream operators more easily
>> to fail.
>>
>> [image: flink-datastream-api-size.png]
>> [image: flink-table-api-size.png]
>>
>> This is how we read parquet files into a table, basically following the
>> examples from the Flink documentation. We're using Flink 1.14.2 with BATCH
>> execution mode.
>> [image: Screen Shot 2022-01-25 at 12.02.49 PM.png]
>>
>> Please advise on how we can further investigate this, or correct any
>> errors in the code. Appreciate your help!
>>
>> Thanks,
>> Anne
>>
>

Re: Huge data size difference using Table and DataStream API

Posted by Alexander Fedulov <al...@ververica.com>.
Hi Anne,

Parquet supports predicates pushdown, but only for Table/SQL API. If you do
any significant projections/filtering in your processing logic, this is
where the difference might come from. Are the results you get at the end of
the both pipelines the same?

Best,
Alexander Fedulov

On Wed, Jan 26, 2022 at 9:30 AM Yun Tang <my...@live.com> wrote:

> Hi Anne,
>
> Table API would usually use different type of format data to communicate
> within different operators compared with data stream API, you can check the
> data types after loading parquet file in table API.
>
> From your description, the table API behaves much better than data stream
> API, do you want to investigate why data stream API behaves poor?
>
>
> Best
> Yun Tang
> ------------------------------
> *From:* Anne Lai <an...@gmail.com>
> *Sent:* Wednesday, January 26, 2022 4:23
> *To:* User <us...@flink.apache.org>
> *Subject:* Huge data size difference using Table and DataStream API
>
> Hi,
>
> We observed a huge data size difference when loading parquet files from
> filesystem using DataStream API and Table API . As you can see in the
> screenshots below, the same dataset ended up to be 67GB and 33MB
> respectively. This is not only causing performance issues (loading a table
> will take up to minutes using DataStream API, while it usually finishes
> within seconds otherwise) but also causing downstream operators more easily
> to fail.
>
> [image: flink-datastream-api-size.png]
> [image: flink-table-api-size.png]
>
> This is how we read parquet files into a table, basically following the
> examples from the Flink documentation. We're using Flink 1.14.2 with BATCH
> execution mode.
> [image: Screen Shot 2022-01-25 at 12.02.49 PM.png]
>
> Please advise on how we can further investigate this, or correct any
> errors in the code. Appreciate your help!
>
> Thanks,
> Anne
>

Re: Huge data size difference using Table and DataStream API

Posted by Yun Tang <my...@live.com>.
Hi Anne,

Table API would usually use different type of format data to communicate within different operators compared with data stream API, you can check the data types after loading parquet file in table API.

From your description, the table API behaves much better than data stream API, do you want to investigate why data stream API behaves poor?


Best
Yun Tang
________________________________
From: Anne Lai <an...@gmail.com>
Sent: Wednesday, January 26, 2022 4:23
To: User <us...@flink.apache.org>
Subject: Huge data size difference using Table and DataStream API

Hi,

We observed a huge data size difference when loading parquet files from filesystem using DataStream API and Table API . As you can see in the screenshots below, the same dataset ended up to be 67GB and 33MB respectively. This is not only causing performance issues (loading a table will take up to minutes using DataStream API, while it usually finishes within seconds otherwise) but also causing downstream operators more easily to fail.

[flink-datastream-api-size.png]
[flink-table-api-size.png]

This is how we read parquet files into a table, basically following the examples from the Flink documentation. We're using Flink 1.14.2 with BATCH execution mode.
[Screen Shot 2022-01-25 at 12.02.49 PM.png]

Please advise on how we can further investigate this, or correct any errors in the code. Appreciate your help!

Thanks,
Anne