You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joe Olson <te...@nododos.com> on 2017/02/16 04:03:24 UTC

Trapping Streaming Errors

If I am processing a stream in the following manner: 

val stream = env.addSource(consumer).name("KafkaStream") 
.keyBy(x => (x.obj.ID1(),x.obj.ID2(),x.obj.ID3()) 
.flatMap(new FlatMapProcessor) 

and the IDs bomb out because of deserialization issues, my job crashes with a 'Could not extract key' error. How can I trap this cleanly? The only thing I can think of is to validate the IDs in the deserialization class argument that is used in the KafkaConsumer constructor, and trap any issues there. Is that the preferred way? Is there a better way? 

Re: Trapping Streaming Errors

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Joe,

you can also insert a MapFunction between the Kafka source and the keyBy to
validate the IDs.
The mapper will be chained and should not add only minimal overhead. If you
want to keep the events which were incorrectly deserialized, you can use
split() to move them somewhere.

Validation in the deserialization code works as well of course but would
not allow to reroute invalid events.

Best, Fabian

2017-02-16 5:03 GMT+01:00 Joe Olson <te...@nododos.com>:

> If I am processing a stream in the following manner:
>
> val stream = env.addSource(consumer).name("KafkaStream")
>                     .keyBy(x => (x.obj.ID1(),x.obj.ID2(),x.obj.ID3())
>                     .flatMap(new FlatMapProcessor)
>
> and the IDs bomb out because of deserialization issues, my job crashes
> with a 'Could not extract key' error. How can I trap this cleanly? The only
> thing I can think of is to validate the IDs in the deserialization class
> argument that is used in the KafkaConsumer constructor, and trap any issues
> there. Is that the preferred way? Is there a better way?
>