You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gaspar Muñoz <gm...@stratio.com> on 2016/07/04 08:46:53 UTC

Streams - Manage Json SerDe error

Hi there,

my question is about Kafka Streams. I'm writting an application using
Streams. I read JSON from Kafka topic and I make some transformations.

I'm using

Serde<JsonNode> jsonNodeSerder = Serdes.serdeFrom(new JsonSerializer(), new
JsonDeserializer());
KStream<String, JsonNode> kStream = builder.stream(Serdes.String(),
jsonNodeSerder, topic);

If, unluckily, we receive a non-json message, JsonDeserializer launch a

throw new SerializationException("Error serializing JSON message", e);

and streams threads die so the entire application dies. In order to make
robust my application  I have to skip this errors and send the error
message to another topic to be analyzed.

Currently I've implemented MyOwnJSONDeserializer extends Kafka
JsonDeserializer and I catch and manage the exception but this don't seems
really clean to me.

How would you manage this errors? I thought I might have used String SerDe
and "manually" convert to JsonNode, is there any better solution?

Thanks.

Re: Streams - Manage Json SerDe error

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Gaspar,

In your case, a single topic can have messages in different format, and my
guess is that they usually have different semantics (e.g. one format for
data record, and another format for control message / error log / etc).

In this case, I'd suggest similar solutions as you mentioned, to either
"filter" out the non-related messages or "branch" this topic into multiple
streams based on their data format with a String-with-manual-Json serde
first, before doing the actual processing. Kafka Streams has the
corresponding filter / branch operators in the high-level DSL for these
purposes.

Guozhang


On Mon, Jul 4, 2016 at 1:46 AM, Gaspar Muñoz <gm...@stratio.com> wrote:

> Hi there,
>
> my question is about Kafka Streams. I'm writting an application using
> Streams. I read JSON from Kafka topic and I make some transformations.
>
> I'm using
>
> Serde<JsonNode> jsonNodeSerder = Serdes.serdeFrom(new JsonSerializer(), new
> JsonDeserializer());
> KStream<String, JsonNode> kStream = builder.stream(Serdes.String(),
> jsonNodeSerder, topic);
>
> If, unluckily, we receive a non-json message, JsonDeserializer launch a
>
> throw new SerializationException("Error serializing JSON message", e);
>
> and streams threads die so the entire application dies. In order to make
> robust my application  I have to skip this errors and send the error
> message to another topic to be analyzed.
>
> Currently I've implemented MyOwnJSONDeserializer extends Kafka
> JsonDeserializer and I catch and manage the exception but this don't seems
> really clean to me.
>
> How would you manage this errors? I thought I might have used String SerDe
> and "manually" convert to JsonNode, is there any better solution?
>
> Thanks.
>



-- 
-- Guozhang