You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nico Kruber <ni...@data-artisans.com> on 2018/01/16 10:34:08 UTC

Re: Unrecoverable job failure after Json parse error?

Hi Adrian,
couldn't you solve this by providing your own DeserializationSchema [1],
possibly extending from JSONKeyValueDeserializationSchema and catching
the error there?


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema

On 12/01/18 18:26, Adrian Vasiliu wrote:
> Hello,
> 
> When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema,
> if an invalid, non-parsable message is sent to the Kafka topic, the
> consumer expectedly fails with JsonParseException. So far so good, but
> this leads to the following loop: the job switches to FAILED
> then attempts to restart and fails again, and so on. That is, the
> parsing error leads to the Kafka message not being committed, hence it
> keeps being received. 
> Since the JsonParseException can't be catched in application code, what
> would be the recommended way to handle the case of possibly
> non-parseable Kafka messages?
>  
> Is there is a way to configure the Flink Kafka consumer to treat the
> case of non-parseable messages by logging the parsing error then
> committing the message such that the processing can continue? Is there
> isn't, would such an enhancement make sense?
> 
> Unless there is a better solution, it looks as a requirement to
> guarantee that FlinkKafkaConsumer011 only receives valid messages, which
> can be annoying in practice.
> 
> For reference, here's the stack of the JsonParseException in the log:
> 
> Source: Custom Source(1/1) switched to FAILED
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Unexpected character (':' (code 58)): Expected space separating
> root-level values
> at [Source: UNKNOWN; line: 1, column: 3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> 
> My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
> 
> Thanks,
> Adrian
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A


Re: Unrecoverable job failure after Json parse error?

Posted by Nico Kruber <ni...@data-artisans.com>.
Nice, I didn't even read that far myself :P
-> turns out the API was prepared for that after all

I'm not sure about a default option for handling/skipping corrupted
messages since the handling of those is probably highly use-case
specific. If you nonetheless feel that this should be in there, feel
free to open an improvement request in our issue tracker at
https://issues.apache.org/jira/browse/FLINK


Nico

On 16/01/18 13:35, Adrian Vasiliu wrote:
> Hi Nico,
> Thanks a lot. I did consider that, but I've missed the clarification of
> the contract brought by the piece a doc you
> pointed: "returning |null| to allow the Flink Kafka consumer to silently
> skip the corrupted message".
> I suppose it could be an improvement
> for JSONKeyValueDeserializationSchema to provide this behaviour as an
> out-of-the-box option. But anyway, I do have a solution in hands.
> Thanks again.
> Adrian
>  
> 
>     ----- Original message -----
>     From: Nico Kruber <ni...@data-artisans.com>
>     To: Adrian Vasiliu <va...@fr.ibm.com>, user@flink.apache.org
>     Cc:
>     Subject: Re: Unrecoverable job failure after Json parse error?
>     Date: Tue, Jan 16, 2018 11:34 AM
>      
>     Hi Adrian,
>     couldn't you solve this by providing your own DeserializationSchema [1],
>     possibly extending from JSONKeyValueDeserializationSchema and catching
>     the error there?
> 
> 
>     Nico
> 
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema
> 
>     On 12/01/18 18:26, Adrian Vasiliu wrote:
>     > Hello,
>     >
>     > When using FlinkKafkaConsumer011
>     with JSONKeyValueDeserializationSchema,
>     > if an invalid, non-parsable message is sent to the Kafka topic, the
>     > consumer expectedly fails with JsonParseException. So far so good, but
>     > this leads to the following loop: the job switches to FAILED
>     > then attempts to restart and fails again, and so on. That is, the
>     > parsing error leads to the Kafka message not being committed, hence it
>     > keeps being received. 
>     > Since the JsonParseException can't be catched in application code,
>     what
>     > would be the recommended way to handle the case of possibly
>     > non-parseable Kafka messages?
>     >  
>     > Is there is a way to configure the Flink Kafka consumer to treat the
>     > case of non-parseable messages by logging the parsing error then
>     > committing the message such that the processing can continue? Is there
>     > isn't, would such an enhancement make sense?
>     >
>     > Unless there is a better solution, it looks as a requirement to
>     > guarantee that FlinkKafkaConsumer011 only receives valid messages,
>     which
>     > can be annoying in practice.
>     >
>     > For reference, here's the stack of the JsonParseException in the log:
>     >
>     > Source: Custom Source(1/1) switched to FAILED
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
>     > Unexpected character (':' (code 58)): Expected space separating
>     > root-level values
>     > at [Source: UNKNOWN; line: 1, column: 3]
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
>     > at
>     >
>     org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
>     > at
>     >
>     org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
>     > at
>     >
>     org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
>     > at
>     >
>     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
>     > at
>     >
>     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     > at
>     >
>     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     > at
>     >
>     org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>     > at
>     >
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     > at java.lang.Thread.run(Thread.java:745)
>     >
>     > My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
>     >
>     > Thanks,
>     > Adrian
>     > Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
>     > Compagnie IBM France
>     > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
>     > RCS Nanterre 552 118 465
>     > Forme Sociale : S.A.S.
>     > Capital Social : 657.364.587 €
>     > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
>      
>      
> 
>  
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A