You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/12/21 13:10:00 UTC

[jira] [Updated] (FLINK-30476) TrackingFsDataInputStream batch tracking issue

     [ https://issues.apache.org/jira/browse/FLINK-30476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated FLINK-30476:
-----------------------------------
    Labels: pull-request-available  (was: )

> TrackingFsDataInputStream batch tracking issue
> ----------------------------------------------
>
>                 Key: FLINK-30476
>                 URL: https://issues.apache.org/jira/browse/FLINK-30476
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.15.1, 1.15.2, 1.15.3
>            Reporter: Denis
>            Priority: Major
>              Labels: pull-request-available
>
> {{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream}} wraps underlying InputStream to count bytes consumed.
> {{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.Reader}} relies on this to create batches of data.
> {code:java}
>             while (stream.hasRemainingInBatch() && (next = reader.read()) != null) {
>                 result.add(next);
>             }
> {code}
> {{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream#read(byte[], int, int)}} contains a bug that can lead to arbitrary size batches due to counter ({{{}remainingInBatch{}}}) underflow.
> {code:java}
>         public int read(byte[] b, int off, int len) throws IOException {
>             remainingInBatch -= len;
>             return stream.read(b, off, len);
>         }
> {code}
> Every time we perform a {{stream.read()}} it may return less than {{len}} according to the javadoc.
> {code:java}
> Params:
> b – the buffer into which the data is read. off – the start offset in array b at which the data is written. len – the maximum number of bytes to read.
> Returns:
> the total number of bytes read into the buffer, or -1 if there is no more data because the end of the stream has been reached.
> {code}
> But current implementation accounts only bytes that were requested ({{{}{{len}}{}}}).
> E.g. S3 Hadoop FS can return less than {{len}} as a result of {{{}stream.read(b, off, len){}}}. This is expected and readers are aware of this 
> {{org.apache.parquet.io.DelegatingSeekableInputStream#readFully(java.io.InputStream, byte[], int, int)}}
> As a result reading parquet file may result in underflow {{TrackingFsDataInputStream#read(byte[], int, int)}} because parquet reader tries to read the whole Row Group (large) and may execute {{read()}} multiple times. Underflow leads to unlimited batch size that may lead to OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)