You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alessandro Tagliapietra <ta...@gmail.com> on 2019/07/05 23:30:12 UTC

Message reprocessing logic

Hello everyone,

I'm looking into a way to reprocess messages in case of soft-errors (not
exceptions)
For example we have a topology that does this:
    input stream -> filtering/flatmap -> window and aggregate

in our aggregate step (maybe should be moved into an additional step) we
make an API call to one of our services.

What I would like to do is to reprocess that message, even better if
possible just the window computation when the API call fails.

By reading this
https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processing-guarantees
if
I'm not mistaken with the default at least one semantic, if I throw an
exception the topology will reprocess the messages after the last commit,
is it possible instead to just soft-retry the last message without throwing
an exception and possibly reprocess also older correctly processed messages?

Also, if my topology starts from a stream uses multiple stores before
windowing, if there's an error in the windowing step, what happens to the
stores changes? When the message is reprocessed, will the store be in the
state it was after it processed the message on the first try?

Thank you in advance

--
Alessandro Tagliapietra

Re: Message reprocessing logic

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
Hi John,

thanks a lot for the great explanation and the links.
After I've sent the question I've researched a bit more about EOS and I'm
currently testing that out.

I'll read those links and see what I come up with!

Thanks and have a great day!

--
Alessandro Tagliapietra

On Tue, Jul 9, 2019 at 2:45 PM John Roesler <jo...@confluent.io> wrote:

> Hi Alessandro,
>
> Sorry if I'm missing some of the context, but could you just keep
> retrying the API call inside a loop? This would block any other
> processing by the same thread, but it would allow Streams to stay up
> in the face of transient failures. Otherwise, I'm afraid that throwing
> an exception is the right thing to do. Streams would re-process the
> record in question when it starts back up, but you'd have to re-start
> it. You can do that programmatically, but it's a bit heavyweight as a
> response to a transient API call failure.
>
> For reference, this is one of several problems that comes up when you
> need to call out to external services during processing. Streams
> currently lacks full support to make this a really pleasant
> experience, but it's a perennial topic of discussion. See
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams
> and
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams
> for a couple of attempts to wrestle with the domain.
>
> To answer your latter question, the store should be returned to its
> prior state when you restart, but if you want to be absolutely sure
> this happens, you need to enable EOS. That will have the side-effect
> of discarding any local state after a crash, though, which makes the
> "crash and recover" strategy even more heavyweight.
>
> I'd recommend wrapping the API call in a retry loop that's as long as
> you can tolerate and then crashing if you still don't get through. Be
> sure to also look through the docs and find any heartbeat configs you
> need to set. Off the top of my head, I think "max poll interval" at
> least needs to be set bigger than your maximum expected pause.
> Probably 2x the total retry-loop time would be a good choice.
>
> I hope this helps,
> -John
>
> On Fri, Jul 5, 2019 at 6:30 PM Alessandro Tagliapietra
> <ta...@gmail.com> wrote:
> >
> > Hello everyone,
> >
> > I'm looking into a way to reprocess messages in case of soft-errors (not
> > exceptions)
> > For example we have a topology that does this:
> >     input stream -> filtering/flatmap -> window and aggregate
> >
> > in our aggregate step (maybe should be moved into an additional step) we
> > make an API call to one of our services.
> >
> > What I would like to do is to reprocess that message, even better if
> > possible just the window computation when the API call fails.
> >
> > By reading this
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processing-guarantees
> > if
> > I'm not mistaken with the default at least one semantic, if I throw an
> > exception the topology will reprocess the messages after the last commit,
> > is it possible instead to just soft-retry the last message without
> throwing
> > an exception and possibly reprocess also older correctly processed
> messages?
> >
> > Also, if my topology starts from a stream uses multiple stores before
> > windowing, if there's an error in the windowing step, what happens to the
> > stores changes? When the message is reprocessed, will the store be in the
> > state it was after it processed the message on the first try?
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
>

Re: Message reprocessing logic

Posted by John Roesler <jo...@confluent.io>.
Hi Alessandro,

Sorry if I'm missing some of the context, but could you just keep
retrying the API call inside a loop? This would block any other
processing by the same thread, but it would allow Streams to stay up
in the face of transient failures. Otherwise, I'm afraid that throwing
an exception is the right thing to do. Streams would re-process the
record in question when it starts back up, but you'd have to re-start
it. You can do that programmatically, but it's a bit heavyweight as a
response to a transient API call failure.

For reference, this is one of several problems that comes up when you
need to call out to external services during processing. Streams
currently lacks full support to make this a really pleasant
experience, but it's a perennial topic of discussion. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams
and https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams
for a couple of attempts to wrestle with the domain.

To answer your latter question, the store should be returned to its
prior state when you restart, but if you want to be absolutely sure
this happens, you need to enable EOS. That will have the side-effect
of discarding any local state after a crash, though, which makes the
"crash and recover" strategy even more heavyweight.

I'd recommend wrapping the API call in a retry loop that's as long as
you can tolerate and then crashing if you still don't get through. Be
sure to also look through the docs and find any heartbeat configs you
need to set. Off the top of my head, I think "max poll interval" at
least needs to be set bigger than your maximum expected pause.
Probably 2x the total retry-loop time would be a good choice.

I hope this helps,
-John

On Fri, Jul 5, 2019 at 6:30 PM Alessandro Tagliapietra
<ta...@gmail.com> wrote:
>
> Hello everyone,
>
> I'm looking into a way to reprocess messages in case of soft-errors (not
> exceptions)
> For example we have a topology that does this:
>     input stream -> filtering/flatmap -> window and aggregate
>
> in our aggregate step (maybe should be moved into an additional step) we
> make an API call to one of our services.
>
> What I would like to do is to reprocess that message, even better if
> possible just the window computation when the API call fails.
>
> By reading this
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processing-guarantees
> if
> I'm not mistaken with the default at least one semantic, if I throw an
> exception the topology will reprocess the messages after the last commit,
> is it possible instead to just soft-retry the last message without throwing
> an exception and possibly reprocess also older correctly processed messages?
>
> Also, if my topology starts from a stream uses multiple stores before
> windowing, if there's an error in the windowing step, what happens to the
> stores changes? When the message is reprocessed, will the store be in the
> state it was after it processed the message on the first try?
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra