You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alex Brekken <br...@gmail.com> on 2019/08/19 19:24:15 UTC

Kafka Streams incorrect aggregation results when re-balancing occurs

Hi all, I have a (relatively) simple streams topology that is producing
some counts, and while testing this code I'm seeing some occasional
incorrect aggregation results.  This seems to happen when a re-balance
occurs - typically due to a timeout or communication hiccup with the Kafka
broker.  The topology is built with the DSL, and utilizes 2 KTables: the
first is really just a de-dup table and the second is the result of the
aggregation.  So at a high level the topology consumes from a source topic,
  groupsByKey() and then does a reduce() where we always return the
newValue.  Then it does a groupBy() on a new key, and finally an
aggregate() call with an adder and subtractor.  Because our source topic
frequently contains duplicate messages, this seemed like a good way to
handle the dupes: the subtractor gets invoked anytime we replace a value in
the "upstream" KTable and removes it from the count, then adds it back
again in the adder.

In the happy-path scenario where we never see any exceptions or rebalances,
this whole thing works great - the counts at the end are 100% correct.  But
when rebalancing is triggered we sometimes get bad counts. My theory is
that when a timeout or connectivity problem happens during that second
aggregation, the data ends up getting saved to the state store but the
offsets don't get committed and the message(s) in the repartition topic
that feed the aggregation get replayed after the stream task gets
rebalanced, causing the counts to get incorrectly incremented or
decremented.  (depending on whether the message was triggering the adder or
the subtractor)  I can simulate this problem (or something similar to this
problem) when debugging the application in my IDE just by pausing execution
on a breakpoint inside the aggregation's adder or subtractor method for a
few seconds.  The result of the adder or subtractor gets saved to the state
store which means that when the messages in the repartition topic get
re-processed, the counts get doubled.  If I enable "exactly_once"
processing, I'm unable to recreate the problem and the counts are always
accurate.

My questions are:

1.  Is this expected behavior? In a hostile application environment where
connectivity problems and rebalances happen frequently, is some degree of
incorrectly aggregated data just a reality of life?

2.  Is exactly_once processing the right solution if correctness is of
highest importance?  Or should I be looking at different ways of writing
the topology?

Thanks for any advice!

Alex

RE: Kafka Streams incorrect aggregation results when re-balancing occurs

Posted by Tim Ward <ti...@origamienergy.com.INVALID>.
I asked an essentially similar question a week or two ago. The answer was "this is expected behaviour unless you switch on exactly-once processing".

(In my case it was solved by changing the topology, which I had to do for other, unconnected, reasons (the requirements for the application changed when I was part way through writing it).)

Tim Ward

-----Original Message-----
From: Alex Brekken <br...@gmail.com>
Sent: 19 August 2019 20:24
To: users@kafka.apache.org
Subject: Kafka Streams incorrect aggregation results when re-balancing occurs

Hi all, I have a (relatively) simple streams topology that is producing
some counts, and while testing this code I'm seeing some occasional
incorrect aggregation results.  This seems to happen when a re-balance
occurs - typically due to a timeout or communication hiccup with the Kafka
broker.  The topology is built with the DSL, and utilizes 2 KTables: the
first is really just a de-dup table and the second is the result of the
aggregation.  So at a high level the topology consumes from a source topic,
  groupsByKey() and then does a reduce() where we always return the
newValue.  Then it does a groupBy() on a new key, and finally an
aggregate() call with an adder and subtractor.  Because our source topic
frequently contains duplicate messages, this seemed like a good way to
handle the dupes: the subtractor gets invoked anytime we replace a value in
the "upstream" KTable and removes it from the count, then adds it back
again in the adder.

In the happy-path scenario where we never see any exceptions or rebalances,
this whole thing works great - the counts at the end are 100% correct.  But
when rebalancing is triggered we sometimes get bad counts. My theory is
that when a timeout or connectivity problem happens during that second
aggregation, the data ends up getting saved to the state store but the
offsets don't get committed and the message(s) in the repartition topic
that feed the aggregation get replayed after the stream task gets
rebalanced, causing the counts to get incorrectly incremented or
decremented.  (depending on whether the message was triggering the adder or
the subtractor)  I can simulate this problem (or something similar to this
problem) when debugging the application in my IDE just by pausing execution
on a breakpoint inside the aggregation's adder or subtractor method for a
few seconds.  The result of the adder or subtractor gets saved to the state
store which means that when the messages in the repartition topic get
re-processed, the counts get doubled.  If I enable "exactly_once"
processing, I'm unable to recreate the problem and the counts are always
accurate.

My questions are:

1.  Is this expected behavior? In a hostile application environment where
connectivity problems and rebalances happen frequently, is some degree of
incorrectly aggregated data just a reality of life?

2.  Is exactly_once processing the right solution if correctness is of
highest importance?  Or should I be looking at different ways of writing
the topology?

Thanks for any advice!

Alex
This email is from Origami Energy Limited. The contents of this email and any attachment are confidential to the intended recipient(s). If you are not an intended recipient: (i) do not use, disclose, distribute, copy or publish this email or its contents; (ii) please contact Origami Energy Limited immediately; and then (iii) delete this email. For more information, our privacy policy is available here: https://origamienergy.com/privacy-policy/. Origami Energy Limited (company number 8619644) is a company registered in England with its registered office at Ashcombe Court, Woolsack Way, Godalming, GU7 1LQ.

Re: Kafka Streams incorrect aggregation results when re-balancing occurs

Posted by "Matthias J. Sax" <ma...@confluent.io>.
> So with
>> exactly_once, it must roll-back commit(s) to the state store in a failure
>> scenario?

Yes. Dirty writes into the stores are "cleaned up" if you enable
exactly-once processing semantics.

"commit" and never rolled back, as a commit indicates successful
processing :)


-Matthias

On 8/20/19 8:07 PM, Alex Brekken wrote:
> Thanks guys.  I knew that re-processing messages was a possibility with
> at_least_once processing, but I guess I hadn't considered the potential
> impact on the state stores as far as aggregations are concerned.  So with
> exactly_once, it must roll-back commit(s) to the state store in a failure
> scenario?  I haven't dug into the code to see how it works, but given the
> behavior I'm seeing it must..
> 
> Tim - I actually saw your related question from last week right after I
> sent mine.  :)
> 
> Alex
> 
> On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <br...@confluent.io> wrote:
> 
>> Hi Alex,
>>
>> what you describe about failing before offsets are committed is one
>> reason why records are processed multiple times under the
>> at-least-once processing guarantee. That is reality of life as you
>> stated. Kafka Streams in exactly-once mode guarantees that this
>> duplicate state updates do not happen.
>>
>> The exactly-once processing guarantee was implemented in Kafka Streams
>> for use cases where correctness is of highest importance.
>>
>> Best,
>> Bruno
>>
>>
>>
>> On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <br...@gmail.com> wrote:
>>>
>>> Hi all, I have a (relatively) simple streams topology that is producing
>>> some counts, and while testing this code I'm seeing some occasional
>>> incorrect aggregation results.  This seems to happen when a re-balance
>>> occurs - typically due to a timeout or communication hiccup with the
>> Kafka
>>> broker.  The topology is built with the DSL, and utilizes 2 KTables: the
>>> first is really just a de-dup table and the second is the result of the
>>> aggregation.  So at a high level the topology consumes from a source
>> topic,
>>>   groupsByKey() and then does a reduce() where we always return the
>>> newValue.  Then it does a groupBy() on a new key, and finally an
>>> aggregate() call with an adder and subtractor.  Because our source topic
>>> frequently contains duplicate messages, this seemed like a good way to
>>> handle the dupes: the subtractor gets invoked anytime we replace a value
>> in
>>> the "upstream" KTable and removes it from the count, then adds it back
>>> again in the adder.
>>>
>>> In the happy-path scenario where we never see any exceptions or
>> rebalances,
>>> this whole thing works great - the counts at the end are 100% correct.
>> But
>>> when rebalancing is triggered we sometimes get bad counts. My theory is
>>> that when a timeout or connectivity problem happens during that second
>>> aggregation, the data ends up getting saved to the state store but the
>>> offsets don't get committed and the message(s) in the repartition topic
>>> that feed the aggregation get replayed after the stream task gets
>>> rebalanced, causing the counts to get incorrectly incremented or
>>> decremented.  (depending on whether the message was triggering the adder
>> or
>>> the subtractor)  I can simulate this problem (or something similar to
>> this
>>> problem) when debugging the application in my IDE just by pausing
>> execution
>>> on a breakpoint inside the aggregation's adder or subtractor method for a
>>> few seconds.  The result of the adder or subtractor gets saved to the
>> state
>>> store which means that when the messages in the repartition topic get
>>> re-processed, the counts get doubled.  If I enable "exactly_once"
>>> processing, I'm unable to recreate the problem and the counts are always
>>> accurate.
>>>
>>> My questions are:
>>>
>>> 1.  Is this expected behavior? In a hostile application environment where
>>> connectivity problems and rebalances happen frequently, is some degree of
>>> incorrectly aggregated data just a reality of life?
>>>
>>> 2.  Is exactly_once processing the right solution if correctness is of
>>> highest importance?  Or should I be looking at different ways of writing
>>> the topology?
>>>
>>> Thanks for any advice!
>>>
>>> Alex
>>
> 


Re: Kafka Streams incorrect aggregation results when re-balancing occurs

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Alex,

if you are interested in understanding exactly-once a bit more in
detail, I recommend you to watch the following Kafka Summit talk by
Matthias

https://www.confluent.io/kafka-summit-london18/dont-repeat-yourself-introducing-exactly-once-semantics-in-apache-kafka

Best,
Bruno

On Wed, Aug 21, 2019 at 5:07 AM Alex Brekken <br...@gmail.com> wrote:
>
> Thanks guys.  I knew that re-processing messages was a possibility with
> at_least_once processing, but I guess I hadn't considered the potential
> impact on the state stores as far as aggregations are concerned.  So with
> exactly_once, it must roll-back commit(s) to the state store in a failure
> scenario?  I haven't dug into the code to see how it works, but given the
> behavior I'm seeing it must..
>
> Tim - I actually saw your related question from last week right after I
> sent mine.  :)
>
> Alex
>
> On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <br...@confluent.io> wrote:
>
> > Hi Alex,
> >
> > what you describe about failing before offsets are committed is one
> > reason why records are processed multiple times under the
> > at-least-once processing guarantee. That is reality of life as you
> > stated. Kafka Streams in exactly-once mode guarantees that this
> > duplicate state updates do not happen.
> >
> > The exactly-once processing guarantee was implemented in Kafka Streams
> > for use cases where correctness is of highest importance.
> >
> > Best,
> > Bruno
> >
> >
> >
> > On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <br...@gmail.com> wrote:
> > >
> > > Hi all, I have a (relatively) simple streams topology that is producing
> > > some counts, and while testing this code I'm seeing some occasional
> > > incorrect aggregation results.  This seems to happen when a re-balance
> > > occurs - typically due to a timeout or communication hiccup with the
> > Kafka
> > > broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> > > first is really just a de-dup table and the second is the result of the
> > > aggregation.  So at a high level the topology consumes from a source
> > topic,
> > >   groupsByKey() and then does a reduce() where we always return the
> > > newValue.  Then it does a groupBy() on a new key, and finally an
> > > aggregate() call with an adder and subtractor.  Because our source topic
> > > frequently contains duplicate messages, this seemed like a good way to
> > > handle the dupes: the subtractor gets invoked anytime we replace a value
> > in
> > > the "upstream" KTable and removes it from the count, then adds it back
> > > again in the adder.
> > >
> > > In the happy-path scenario where we never see any exceptions or
> > rebalances,
> > > this whole thing works great - the counts at the end are 100% correct.
> > But
> > > when rebalancing is triggered we sometimes get bad counts. My theory is
> > > that when a timeout or connectivity problem happens during that second
> > > aggregation, the data ends up getting saved to the state store but the
> > > offsets don't get committed and the message(s) in the repartition topic
> > > that feed the aggregation get replayed after the stream task gets
> > > rebalanced, causing the counts to get incorrectly incremented or
> > > decremented.  (depending on whether the message was triggering the adder
> > or
> > > the subtractor)  I can simulate this problem (or something similar to
> > this
> > > problem) when debugging the application in my IDE just by pausing
> > execution
> > > on a breakpoint inside the aggregation's adder or subtractor method for a
> > > few seconds.  The result of the adder or subtractor gets saved to the
> > state
> > > store which means that when the messages in the repartition topic get
> > > re-processed, the counts get doubled.  If I enable "exactly_once"
> > > processing, I'm unable to recreate the problem and the counts are always
> > > accurate.
> > >
> > > My questions are:
> > >
> > > 1.  Is this expected behavior? In a hostile application environment where
> > > connectivity problems and rebalances happen frequently, is some degree of
> > > incorrectly aggregated data just a reality of life?
> > >
> > > 2.  Is exactly_once processing the right solution if correctness is of
> > > highest importance?  Or should I be looking at different ways of writing
> > > the topology?
> > >
> > > Thanks for any advice!
> > >
> > > Alex
> >

Re: Kafka Streams incorrect aggregation results when re-balancing occurs

Posted by Alex Brekken <br...@gmail.com>.
Thanks guys.  I knew that re-processing messages was a possibility with
at_least_once processing, but I guess I hadn't considered the potential
impact on the state stores as far as aggregations are concerned.  So with
exactly_once, it must roll-back commit(s) to the state store in a failure
scenario?  I haven't dug into the code to see how it works, but given the
behavior I'm seeing it must..

Tim - I actually saw your related question from last week right after I
sent mine.  :)

Alex

On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alex,
>
> what you describe about failing before offsets are committed is one
> reason why records are processed multiple times under the
> at-least-once processing guarantee. That is reality of life as you
> stated. Kafka Streams in exactly-once mode guarantees that this
> duplicate state updates do not happen.
>
> The exactly-once processing guarantee was implemented in Kafka Streams
> for use cases where correctness is of highest importance.
>
> Best,
> Bruno
>
>
>
> On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <br...@gmail.com> wrote:
> >
> > Hi all, I have a (relatively) simple streams topology that is producing
> > some counts, and while testing this code I'm seeing some occasional
> > incorrect aggregation results.  This seems to happen when a re-balance
> > occurs - typically due to a timeout or communication hiccup with the
> Kafka
> > broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> > first is really just a de-dup table and the second is the result of the
> > aggregation.  So at a high level the topology consumes from a source
> topic,
> >   groupsByKey() and then does a reduce() where we always return the
> > newValue.  Then it does a groupBy() on a new key, and finally an
> > aggregate() call with an adder and subtractor.  Because our source topic
> > frequently contains duplicate messages, this seemed like a good way to
> > handle the dupes: the subtractor gets invoked anytime we replace a value
> in
> > the "upstream" KTable and removes it from the count, then adds it back
> > again in the adder.
> >
> > In the happy-path scenario where we never see any exceptions or
> rebalances,
> > this whole thing works great - the counts at the end are 100% correct.
> But
> > when rebalancing is triggered we sometimes get bad counts. My theory is
> > that when a timeout or connectivity problem happens during that second
> > aggregation, the data ends up getting saved to the state store but the
> > offsets don't get committed and the message(s) in the repartition topic
> > that feed the aggregation get replayed after the stream task gets
> > rebalanced, causing the counts to get incorrectly incremented or
> > decremented.  (depending on whether the message was triggering the adder
> or
> > the subtractor)  I can simulate this problem (or something similar to
> this
> > problem) when debugging the application in my IDE just by pausing
> execution
> > on a breakpoint inside the aggregation's adder or subtractor method for a
> > few seconds.  The result of the adder or subtractor gets saved to the
> state
> > store which means that when the messages in the repartition topic get
> > re-processed, the counts get doubled.  If I enable "exactly_once"
> > processing, I'm unable to recreate the problem and the counts are always
> > accurate.
> >
> > My questions are:
> >
> > 1.  Is this expected behavior? In a hostile application environment where
> > connectivity problems and rebalances happen frequently, is some degree of
> > incorrectly aggregated data just a reality of life?
> >
> > 2.  Is exactly_once processing the right solution if correctness is of
> > highest importance?  Or should I be looking at different ways of writing
> > the topology?
> >
> > Thanks for any advice!
> >
> > Alex
>

Re: Kafka Streams incorrect aggregation results when re-balancing occurs

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Alex,

what you describe about failing before offsets are committed is one
reason why records are processed multiple times under the
at-least-once processing guarantee. That is reality of life as you
stated. Kafka Streams in exactly-once mode guarantees that this
duplicate state updates do not happen.

The exactly-once processing guarantee was implemented in Kafka Streams
for use cases where correctness is of highest importance.

Best,
Bruno



On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <br...@gmail.com> wrote:
>
> Hi all, I have a (relatively) simple streams topology that is producing
> some counts, and while testing this code I'm seeing some occasional
> incorrect aggregation results.  This seems to happen when a re-balance
> occurs - typically due to a timeout or communication hiccup with the Kafka
> broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> first is really just a de-dup table and the second is the result of the
> aggregation.  So at a high level the topology consumes from a source topic,
>   groupsByKey() and then does a reduce() where we always return the
> newValue.  Then it does a groupBy() on a new key, and finally an
> aggregate() call with an adder and subtractor.  Because our source topic
> frequently contains duplicate messages, this seemed like a good way to
> handle the dupes: the subtractor gets invoked anytime we replace a value in
> the "upstream" KTable and removes it from the count, then adds it back
> again in the adder.
>
> In the happy-path scenario where we never see any exceptions or rebalances,
> this whole thing works great - the counts at the end are 100% correct.  But
> when rebalancing is triggered we sometimes get bad counts. My theory is
> that when a timeout or connectivity problem happens during that second
> aggregation, the data ends up getting saved to the state store but the
> offsets don't get committed and the message(s) in the repartition topic
> that feed the aggregation get replayed after the stream task gets
> rebalanced, causing the counts to get incorrectly incremented or
> decremented.  (depending on whether the message was triggering the adder or
> the subtractor)  I can simulate this problem (or something similar to this
> problem) when debugging the application in my IDE just by pausing execution
> on a breakpoint inside the aggregation's adder or subtractor method for a
> few seconds.  The result of the adder or subtractor gets saved to the state
> store which means that when the messages in the repartition topic get
> re-processed, the counts get doubled.  If I enable "exactly_once"
> processing, I'm unable to recreate the problem and the counts are always
> accurate.
>
> My questions are:
>
> 1.  Is this expected behavior? In a hostile application environment where
> connectivity problems and rebalances happen frequently, is some degree of
> incorrectly aggregated data just a reality of life?
>
> 2.  Is exactly_once processing the right solution if correctness is of
> highest importance?  Or should I be looking at different ways of writing
> the topology?
>
> Thanks for any advice!
>
> Alex