You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Carl Graving <cg...@gmail.com> on 2020/04/21 02:59:23 UTC

Deserialization Exception when performing state store operations

I have a state store being built from a stream with a custom set of value
serdes. In the stream processing I am able to handle errors appropriately
with the exception handler, but if I attempt to do a state store getAll and
use the iterator.hasNext() or next() or peek methods, any exceptions in the
deserialization are thrown. This is proving to be difficult to iterate over
the items in the state store and skip bad entries or remove them. I can see
different ways this could happen, such as avro schema removed from schema
registry (and cache) for an item in the state store. Am I missing an easy
way to deal with deserialization exceptions stemming from state store
iterator or gets? I will keep playing around, but it was hard to find a way
to reliably use the iterator (getAll) when hasNext, next, peekNext, etc all
throw exceptions.

Thanks for any help or pointers on how to properly handle exceptions in
this case.

Carl

Re: Deserialization Exception when performing state store operations

Posted by Carl Graving <cg...@gmail.com>.
Yeah, avro was an example of how it could happen. More accurate would be a
schema accidentally removed somehow. Basically the serializers worked to
write to state store, deserialize throws exception. Of course it is a
corner case, was just trying to see if there was a better way to handle the
bad message if it made its way to the state store as a good message and the
environment changed making it bad.

I don't want to handle the error in the deserializer as it is used by
multiple services and they handle errors differently. I could wrap for just
the store case like you suggested.

Carl

On Mon, Apr 20, 2020, 23:40 John Roesler <vv...@apache.org> wrote:

> Hi Carl,
>
> That sounds pretty frustrating; sorry about that.
>
> I think I got a hint, but I'm not totally clear on the situation. It
> shouldn't
> be possible for data to get into the store if it can't be handled by the
> serde.
> There is a specific issue with global stores, but it doesn't sound like
> that's
> your situation.
>
> It sounds like what has happened is that the serde itself has "forgotten"
> how to handle data that it previously could handle, because someone
> deleted the schema from the registry. Realistically, this sounds more like
> a situation to prevent than one to handle. From an "ivory tower"
> perspective,
> it seems like you could:
> * establish a rule that once the schema has been used, it can only be
>   evolved
> * or that you would establish a deprecation period in which both
>   the new and old schema are present, so you can translate all the records
> * or if you really need to make a discontinuous change to the schema, it
>   probably means that the store is now completely different in meaning, and
>   you could just give it a different name and load it from scratch
>
> But if nothing like that fits the bill, then you could consider wrapping
> the avro
> serde in an error-handling serde that delegates to your avro serde, catches
> the deserialization exception, and instead returns a sentinel value that
> you
> can handle somehow?
>
> I hope this helps,
> Thanks,
> -John
>
> On Mon, Apr 20, 2020, at 21:59, Carl Graving wrote:
> > I have a state store being built from a stream with a custom set of value
> > serdes. In the stream processing I am able to handle errors appropriately
> > with the exception handler, but if I attempt to do a state store getAll
> and
> > use the iterator.hasNext() or next() or peek methods, any exceptions in
> the
> > deserialization are thrown. This is proving to be difficult to iterate
> over
> > the items in the state store and skip bad entries or remove them. I can
> see
> > different ways this could happen, such as avro schema removed from schema
> > registry (and cache) for an item in the state store. Am I missing an easy
> > way to deal with deserialization exceptions stemming from state store
> > iterator or gets? I will keep playing around, but it was hard to find a
> way
> > to reliably use the iterator (getAll) when hasNext, next, peekNext, etc
> all
> > throw exceptions.
> >
> > Thanks for any help or pointers on how to properly handle exceptions in
> > this case.
> >
> > Carl
> >
>

Re: Deserialization Exception when performing state store operations

Posted by John Roesler <vv...@apache.org>.
Hi Carl,

That sounds pretty frustrating; sorry about that.

I think I got a hint, but I'm not totally clear on the situation. It shouldn't
be possible for data to get into the store if it can't be handled by the serde.
There is a specific issue with global stores, but it doesn't sound like that's
your situation.

It sounds like what has happened is that the serde itself has "forgotten"
how to handle data that it previously could handle, because someone
deleted the schema from the registry. Realistically, this sounds more like
a situation to prevent than one to handle. From an "ivory tower" perspective,
it seems like you could:
* establish a rule that once the schema has been used, it can only be
  evolved
* or that you would establish a deprecation period in which both
  the new and old schema are present, so you can translate all the records
* or if you really need to make a discontinuous change to the schema, it
  probably means that the store is now completely different in meaning, and
  you could just give it a different name and load it from scratch

But if nothing like that fits the bill, then you could consider wrapping the avro
serde in an error-handling serde that delegates to your avro serde, catches
the deserialization exception, and instead returns a sentinel value that you
can handle somehow?

I hope this helps,
Thanks,
-John

On Mon, Apr 20, 2020, at 21:59, Carl Graving wrote:
> I have a state store being built from a stream with a custom set of value
> serdes. In the stream processing I am able to handle errors appropriately
> with the exception handler, but if I attempt to do a state store getAll and
> use the iterator.hasNext() or next() or peek methods, any exceptions in the
> deserialization are thrown. This is proving to be difficult to iterate over
> the items in the state store and skip bad entries or remove them. I can see
> different ways this could happen, such as avro schema removed from schema
> registry (and cache) for an item in the state store. Am I missing an easy
> way to deal with deserialization exceptions stemming from state store
> iterator or gets? I will keep playing around, but it was hard to find a way
> to reliably use the iterator (getAll) when hasNext, next, peekNext, etc all
> throw exceptions.
> 
> Thanks for any help or pointers on how to properly handle exceptions in
> this case.
> 
> Carl
>

Re: Deserialization Exception when performing state store operations

Posted by Carl Graving <cg...@gmail.com>.
I have handled it everywhere, just is not seeming possible from the state
store get. I can wrap in a try catch, but with get all it is seeming hard
to skip these or remove them.

Thanks

On Tue, Apr 21, 2020, 01:38 Ezequiel Puig <es...@gmail.com> wrote:

> Hello Carl,
>
> I think that the problem you are facing is a "poison pill"
>
> You can take a look at this video for approches on different solutions :
>
> https://www.confluent.io/kafka-summit-san-francisco-2019/streaming-apps-and-poison-pills-handle-the-unexpected-with-kafka-streams/
>
> Best regards,
>
> Ezequiel PUIG
>
> Le mar. 21 avr. 2020 à 04:59, Carl Graving <cg...@gmail.com> a écrit :
>
> > I have a state store being built from a stream with a custom set of value
> > serdes. In the stream processing I am able to handle errors appropriately
> > with the exception handler, but if I attempt to do a state store getAll
> and
> > use the iterator.hasNext() or next() or peek methods, any exceptions in
> the
> > deserialization are thrown. This is proving to be difficult to iterate
> over
> > the items in the state store and skip bad entries or remove them. I can
> see
> > different ways this could happen, such as avro schema removed from schema
> > registry (and cache) for an item in the state store. Am I missing an easy
> > way to deal with deserialization exceptions stemming from state store
> > iterator or gets? I will keep playing around, but it was hard to find a
> way
> > to reliably use the iterator (getAll) when hasNext, next, peekNext, etc
> all
> > throw exceptions.
> >
> > Thanks for any help or pointers on how to properly handle exceptions in
> > this case.
> >
> > Carl
> >
>

Re: Deserialization Exception when performing state store operations

Posted by Ezequiel Puig <es...@gmail.com>.
Hello Carl,

I think that the problem you are facing is a "poison pill"

You can take a look at this video for approches on different solutions :
https://www.confluent.io/kafka-summit-san-francisco-2019/streaming-apps-and-poison-pills-handle-the-unexpected-with-kafka-streams/

Best regards,

Ezequiel PUIG

Le mar. 21 avr. 2020 à 04:59, Carl Graving <cg...@gmail.com> a écrit :

> I have a state store being built from a stream with a custom set of value
> serdes. In the stream processing I am able to handle errors appropriately
> with the exception handler, but if I attempt to do a state store getAll and
> use the iterator.hasNext() or next() or peek methods, any exceptions in the
> deserialization are thrown. This is proving to be difficult to iterate over
> the items in the state store and skip bad entries or remove them. I can see
> different ways this could happen, such as avro schema removed from schema
> registry (and cache) for an item in the state store. Am I missing an easy
> way to deal with deserialization exceptions stemming from state store
> iterator or gets? I will keep playing around, but it was hard to find a way
> to reliably use the iterator (getAll) when hasNext, next, peekNext, etc all
> throw exceptions.
>
> Thanks for any help or pointers on how to properly handle exceptions in
> this case.
>
> Carl
>