You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Denis (Jira)" <ji...@apache.org> on 2022/12/21 12:58:00 UTC

[jira] [Created] (FLINK-30476) TrackingFsDataInputStream batch tracking issue

Denis created FLINK-30476:
-----------------------------

             Summary: TrackingFsDataInputStream batch tracking issue
                 Key: FLINK-30476
                 URL: https://issues.apache.org/jira/browse/FLINK-30476
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.15.3, 1.15.2, 1.15.1
            Reporter: Denis


{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream}} wraps underlying InputStream to count bytes consumed.
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.Reader}} relies on this to create batches of data.
{code:java}
            while (stream.hasRemainingInBatch() && (next = reader.read()) != null) {
                result.add(next);
            }
{code}
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream#read(byte[], int, int)}} contains a bug that can lead to arbitrary size batches due to counter ({{{}remainingInBatch{}}}) underflow.
{code:java}
        public int read(byte[] b, int off, int len) throws IOException {
            remainingInBatch -= len;
            return stream.read(b, off, len);
        }
{code}
Every time we perform a {{stream.read()}} it may return less than {{len}} according to the javadoc.
{code:java}
Params:
b – the buffer into which the data is read. off – the start offset in array b at which the data is written. len – the maximum number of bytes to read.
Returns:
the total number of bytes read into the buffer, or -1 if there is no more data because the end of the stream has been reached.
{code}
But current implementation accounts only bytes that were requested ({{{}{{len}}{}}}).

E.g. S3 Hadoop FS can return less than {{len}} as a result of {{{}stream.read(b, off, len){}}}. This is expected and readers are aware of this 
{{org.apache.parquet.io.DelegatingSeekableInputStream#readFully(java.io.InputStream, byte[], int, int)}}

As a result reading parquet file may result in underflow {{TrackingFsDataInputStream#read(byte[], int, int)}} because parquet reader tries to read the whole Row Group (large) and may execute {{read()}} multiple times. Underflow leads to unlimited batch size that may lead to OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)