You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Austin Cawley-Edwards <au...@gmail.com> on 2020/10/16 18:32:28 UTC

Un-ignored Parsing Exceptions in the CsvFormat

Hey all,

I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].

Even with the `ignoreParseErrors()` set, the job fails when it encounters
some types of malformed rows. The root cause is indeed a `ParseException`,
so I'm wondering if there's anything more I need to do to ignore these
rows. Each field in the schema is a STRING.


I've configured the CSV format and table like so:

tableEnv.connect(
    new FileSystem()
        .path(path)
)
    .withFormat(
        new Csv()
            .quoteCharacter('"')
            .ignoreParseErrors()
    )
    .withSchema(schema)
    .inAppendMode()


Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
to `isLenient()` if there is an unexpected parser position?[2]

Example error:

2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception when processing split: null
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser
position for column 1 of row '",
https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
"",,,,company,'
at
org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at
org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at
org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at
org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)


Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
[2]:
https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206

Re: Un-ignored Parsing Exceptions in the CsvFormat

Posted by Khachatryan Roman <kh...@gmail.com>.
Hey Austin,

I assigned the ticket,
that would be great if you could fix it!

Regards,
Roman


On Thu, Oct 22, 2020 at 5:08 PM Austin Cawley-Edwards <
austin.cawley@gmail.com> wrote:

> Hey Roman,
>
> Sorry to miss this -- thanks for the confirmation and making the ticket.
> I'm happy to propose a fix if someone is able to assign the ticket to me.
>
> Best,
> Austin
>
> On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
>> Hey Austin,
>>
>> I think you are right. The problematic row contains an odd number of
>> delimiters in which case skipFields will return -1, which in turn leads to
>> an exception.
>>
>> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
>> to fix it.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
>> austin.cawley@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>>> Format[1].
>>>
>>> Even with the `ignoreParseErrors()` set, the job fails when it
>>> encounters some types of malformed rows. The root cause is indeed a
>>> `ParseException`, so I'm wondering if there's anything more I need to do to
>>> ignore these rows. Each field in the schema is a STRING.
>>>
>>>
>>> I've configured the CSV format and table like so:
>>>
>>> tableEnv.connect(
>>>     new FileSystem()
>>>         .path(path)
>>> )
>>>     .withFormat(
>>>         new Csv()
>>>             .quoteCharacter('"')
>>>             .ignoreParseErrors()
>>>     )
>>>     .withSchema(schema)
>>>     .inAppendMode()
>>>
>>>
>>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a
>>> check to `isLenient()` if there is an unexpected parser position?[2]
>>>
>>> Example error:
>>>
>>> 2020-10-16 12:50:18
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception when processing split: null
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>>> parser position for column 1 of row '",
>>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>>> "",,,,company,'
>>> at
>>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>>> at
>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>>
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>>> [2]:
>>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>>
>>

Re: Un-ignored Parsing Exceptions in the CsvFormat

Posted by Khachatryan Roman <kh...@gmail.com>.
Hey Austin,

I assigned the ticket,
that would be great if you could fix it!

Regards,
Roman


On Thu, Oct 22, 2020 at 5:08 PM Austin Cawley-Edwards <
austin.cawley@gmail.com> wrote:

> Hey Roman,
>
> Sorry to miss this -- thanks for the confirmation and making the ticket.
> I'm happy to propose a fix if someone is able to assign the ticket to me.
>
> Best,
> Austin
>
> On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
>> Hey Austin,
>>
>> I think you are right. The problematic row contains an odd number of
>> delimiters in which case skipFields will return -1, which in turn leads to
>> an exception.
>>
>> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
>> to fix it.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
>> austin.cawley@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>>> Format[1].
>>>
>>> Even with the `ignoreParseErrors()` set, the job fails when it
>>> encounters some types of malformed rows. The root cause is indeed a
>>> `ParseException`, so I'm wondering if there's anything more I need to do to
>>> ignore these rows. Each field in the schema is a STRING.
>>>
>>>
>>> I've configured the CSV format and table like so:
>>>
>>> tableEnv.connect(
>>>     new FileSystem()
>>>         .path(path)
>>> )
>>>     .withFormat(
>>>         new Csv()
>>>             .quoteCharacter('"')
>>>             .ignoreParseErrors()
>>>     )
>>>     .withSchema(schema)
>>>     .inAppendMode()
>>>
>>>
>>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a
>>> check to `isLenient()` if there is an unexpected parser position?[2]
>>>
>>> Example error:
>>>
>>> 2020-10-16 12:50:18
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception when processing split: null
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>>> parser position for column 1 of row '",
>>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>>> "",,,,company,'
>>> at
>>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>>> at
>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>>
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>>> [2]:
>>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>>
>>

Re: Un-ignored Parsing Exceptions in the CsvFormat

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Hey Roman,

Sorry to miss this -- thanks for the confirmation and making the ticket.
I'm happy to propose a fix if someone is able to assign the ticket to me.

Best,
Austin

On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Hey Austin,
>
> I think you are right. The problematic row contains an odd number of
> delimiters in which case skipFields will return -1, which in turn leads to
> an exception.
>
> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
> to fix it.
>
> Regards,
> Roman
>
>
> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
> austin.cawley@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>> Format[1].
>>
>> Even with the `ignoreParseErrors()` set, the job fails when it encounters
>> some types of malformed rows. The root cause is indeed a `ParseException`,
>> so I'm wondering if there's anything more I need to do to ignore these
>> rows. Each field in the schema is a STRING.
>>
>>
>> I've configured the CSV format and table like so:
>>
>> tableEnv.connect(
>>     new FileSystem()
>>         .path(path)
>> )
>>     .withFormat(
>>         new Csv()
>>             .quoteCharacter('"')
>>             .ignoreParseErrors()
>>     )
>>     .withSchema(schema)
>>     .inAppendMode()
>>
>>
>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
>> to `isLenient()` if there is an unexpected parser position?[2]
>>
>> Example error:
>>
>> 2020-10-16 12:50:18
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception when processing split: null
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>> parser position for column 1 of row '",
>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>> "",,,,company,'
>> at
>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>> [2]:
>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>
>

Re: Un-ignored Parsing Exceptions in the CsvFormat

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Hey Roman,

Sorry to miss this -- thanks for the confirmation and making the ticket.
I'm happy to propose a fix if someone is able to assign the ticket to me.

Best,
Austin

On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Hey Austin,
>
> I think you are right. The problematic row contains an odd number of
> delimiters in which case skipFields will return -1, which in turn leads to
> an exception.
>
> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
> to fix it.
>
> Regards,
> Roman
>
>
> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
> austin.cawley@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>> Format[1].
>>
>> Even with the `ignoreParseErrors()` set, the job fails when it encounters
>> some types of malformed rows. The root cause is indeed a `ParseException`,
>> so I'm wondering if there's anything more I need to do to ignore these
>> rows. Each field in the schema is a STRING.
>>
>>
>> I've configured the CSV format and table like so:
>>
>> tableEnv.connect(
>>     new FileSystem()
>>         .path(path)
>> )
>>     .withFormat(
>>         new Csv()
>>             .quoteCharacter('"')
>>             .ignoreParseErrors()
>>     )
>>     .withSchema(schema)
>>     .inAppendMode()
>>
>>
>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
>> to `isLenient()` if there is an unexpected parser position?[2]
>>
>> Example error:
>>
>> 2020-10-16 12:50:18
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception when processing split: null
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>> parser position for column 1 of row '",
>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>> "",,,,company,'
>> at
>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>> [2]:
>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>
>

Re: Un-ignored Parsing Exceptions in the CsvFormat

Posted by Khachatryan Roman <kh...@gmail.com>.
Hey Austin,

I think you are right. The problematic row contains an odd number of
delimiters in which case skipFields will return -1, which in turn leads to
an exception.

I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711 to
fix it.

Regards,
Roman


On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
austin.cawley@gmail.com> wrote:

> Hey all,
>
> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].
>
> Even with the `ignoreParseErrors()` set, the job fails when it encounters
> some types of malformed rows. The root cause is indeed a `ParseException`,
> so I'm wondering if there's anything more I need to do to ignore these
> rows. Each field in the schema is a STRING.
>
>
> I've configured the CSV format and table like so:
>
> tableEnv.connect(
>     new FileSystem()
>         .path(path)
> )
>     .withFormat(
>         new Csv()
>             .quoteCharacter('"')
>             .ignoreParseErrors()
>     )
>     .withSchema(schema)
>     .inAppendMode()
>
>
> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
> to `isLenient()` if there is an unexpected parser position?[2]
>
> Example error:
>
> 2020-10-16 12:50:18
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception when processing split: null
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
> parser position for column 1 of row '",
> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
> "",,,,company,'
> at
> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
> at
> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
> at
> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>
>
> Thanks,
> Austin
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
> [2]:
> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>