You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "DONG, Weike" <ky...@connect.hku.hk> on 2020/03/17 03:44:26 UTC

Flink YARN Job unexpectedly switched to SUCCEEDED final status after CSV deserialization failure

Hi community,

I have noticed that when a wrong CSV record is ingested and deserialized at
CsvRowDeserializationSchema, when ignoreParseErrors is set to false, then
an IOException is thrown, which is expected, and in earlier Flink versions,
the Flink YARN app would terminated with FINISHED state and FAILED final
status for per-job mode Flink jobs without specifically setting a
RestartStrategy.

However, to my surprise, in Flink 1.10, the YARN app switched to SUCCEEDED
final status afterwards, which is weird since that this Flink job
termination was caused by abnormal data input, instead of normal end of
source stream.

Here I am confused because normally SUCCEEDED final status would give users
implications that the job has been **successfully** ended, however this is
definitely not the case. So I just wonder if this is the designed behavior
of Flink 1.10 on data deserialization errors.

Thank you for reading : )

Sincerely,
Weike

Re: Flink YARN Job unexpectedly switched to SUCCEEDED final status after CSV deserialization failure

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi dev,

After debugging and analysis, eventually the culprit is found.

In order to cope with occasional data-format errors and intermittent source
exceptions (Internet connection is somewhat unstable in my environment)
that interrupt my Flink jobs occasionally, I have added a broad catch block
in *org.apache.flink.streaming.api.operators.StreamSource#run* method to
catch any exceptions thrown in the source operator by*
userFunction.run(ctx);*, and this patch works well and keeps the jobs
running smoothly without having to restart the job again and again.

However, when the IOException is thrown at KafkaFetcher#run, in this method
no catch block is defined, thus the exception would go directly to the
finally block, which calls *consumerThread.shutdown();. *Therefore, even
when this exception is later caught in StreamSource, it already causes
KafkaFetcher to permanently close the consumer thread without any logging
or warnings, so Flink can do nothing but finish the job.

[image: image.png]

In conclusion, it is my fault not to catch the exception in KafkaFetcher,
and Flink is not the one to blame : )

Thanks

Sincerely,
Weike

On Tue, Mar 17, 2020 at 11:44 AM DONG, Weike <ky...@connect.hku.hk>
wrote:

> Hi community,
>
> I have noticed that when a wrong CSV record is ingested and deserialized
> at CsvRowDeserializationSchema, when ignoreParseErrors is set to false,
> then an IOException is thrown, which is expected, and in earlier Flink
> versions, the Flink YARN app would terminated with FINISHED state and
> FAILED final status for per-job mode Flink jobs without specifically
> setting a RestartStrategy.
>
> However, to my surprise, in Flink 1.10, the YARN app switched to SUCCEEDED
> final status afterwards, which is weird since that this Flink job
> termination was caused by abnormal data input, instead of normal end of
> source stream.
>
> Here I am confused because normally SUCCEEDED final status would give
> users implications that the job has been **successfully** ended, however
> this is definitely not the case. So I just wonder if this is the designed
> behavior of Flink 1.10 on data deserialization errors.
>
> Thank you for reading : )
>
> Sincerely,
> Weike
>