You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Etienne Chauchot (Jira)" <ji...@apache.org> on 2023/09/18 08:22:00 UTC

[jira] [Comment Edited] (FLINK-30314) Unable to read all records from compressed delimited file format

    [ https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762651#comment-17762651 ] 

Etienne Chauchot edited comment on FLINK-30314 at 9/18/23 8:21 AM:
-------------------------------------------------------------------

Made the ticket description more general (the problem is with delimited input format being splittable but not when it is compressed). Added tests on newer flink versions and reduced the module scope to just file-connector (the problem is on the file-connector in ganeral that is use for table but also for other APIs).

The problem is with DelimitedInputFormat as none of its subclasses call FileInputFormat#createSplits (that would detect that the file is non-splittable and deal with reading boundaries correctly), then all use FileSource dans org.apache.flink.connector.file.src that creates its own splits


was (Author: echauchot):
Made the ticket description more general (the problem is with delimited input format being splittable but not when it is compressed). Added tests on newer flink versions and reduced the module scope to just file-connector (the problem is on the file-connector in ganeral that is use for table but also for other APIs)

> Unable to read all records from compressed delimited file format
> ----------------------------------------------------------------
>
>                 Key: FLINK-30314
>                 URL: https://issues.apache.org/jira/browse/FLINK-30314
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.16.0, 1.15.2, 1.17.1
>            Reporter: Dmitry Yaraev
>            Assignee: Etienne Chauchot
>            Priority: Major
>         Attachments: input.json, input.json.gz, input.json.zip
>
>
> I am reading gzipped JSON line-delimited files in the batch mode using [FileSystem Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/]. For reading the files a new table is created with the following configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
>   `my_field1` BIGINT,
>   `my_field2` INT,
>   `my_field3` VARCHAR(2147483647)
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'path-to-input-dir',
>   'format' = 'json',
>   'json.ignore-parse-errors' = 'false',
>   'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-00000.json.gz and input-00001.json.gz. As it comes from the filenames, the files are compressed with GZIP. Each of the files contains 10 records. The issue is that only 2 records from each file are read (4 in total). If decompressed versions of the same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split length, which is used when the files are read, is in fact the length of a compressed file. So files are closed before all records are read from them because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we could identify if the file compressed or not. The flag can be set to true in {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file streams. With such a flag it could be possible to differentiate non-splittable compressed files and only rely on the end of the stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)