You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2021/05/03 21:17:05 UTC

Re: Kafka Stream: State replication seems unpredictable.

Hello Mohan,

Sorry for getting late on the thread. Just to revive your concerns here: if
in your topology there's no output at all to any topics (sink topics,
changelog topics), then yes the zombie would not be detected; but on the
other hand the topology itself is not make any visible changes to the
external systems anyways -- you can just think of a zombie who's keeping
doing redundant work and then drop the results on the floor since nothing
was reflected outside.

On the other hand, if the tasks are at least writing to some sink topics,
then zombies would still be detected.


Guozhang

On Thu, Apr 22, 2021 at 10:47 AM Parthasarathy, Mohan <mp...@hpe.com>
wrote:

> Guozhang,
>
> What does this mean if the changelog topic was disabled ? If thread 2 and
> thread 4 are running in two different nodes and a rebalance occurs, thread
> 2 will not realize it is a zombie without the write to the changelog topic,
> right ? I am trying to understand the cases under which the changelog topic
> can ever be disabled.
>
> Thanks
> Mohan
>
>
> On 4/21/21, 10:22 PM, "Guozhang Wang" <wa...@gmail.com> wrote:
>
>     Hello Mangat,
>
>     I think using persistent store that relies on in-memory stores could
> help
>     if the threads are from the same instance.
>
>     Guozhang
>
>     On Tue, Apr 20, 2021 at 12:54 AM mangat rai <ma...@gmail.com>
> wrote:
>
>     > Hey Guozhang,
>     >
>     > Thanks for creating the issue. Yes, you are right, this will happen
> only
>     > with the consecutive rebalancing as after some time zombie thread
> will stop
>     > and re-join the group and the new thread will always overwrite the
> state
>     > with the latest data. In our poor infra setup, the rebalancing was
>     > happening many times in a row.
>     >
>     > Now, we can't guarantee that the consecutive rebalancing will not
> happen
>     > again (we reduced fetch-size which fixed it in many ways), will any
> of the
>     > following work as a workaround?
>     >
>     > 1. Use persistent store instead of in-memory. The new thread will
> never get
>     > the lock hence we will lose availability but keep things consistent.
>     > 2. Use exactly-once semantics. However, we might need to redesign
> our apps.
>     > It's a bigger change.
>     >
>     > Regards,
>     > Mangat
>     >
>     > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wa...@gmail.com>
> wrote:
>     >
>     > > Hello Mangat,
>     > >
>     > > What you've encountered is a "zombie writer" issue, that is,
> Thread-2 did
>     > > not know there's already a new rebalance and hence its partitions
> have
>     > been
>     > > migrated out, until it tries to commit and then got notified of the
>     > > illegal-generation error and realize itself is the "zombie"
> already. This
>     > > case would still persist even with incremental rebalancing.
>     > >
>     > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693  to
>     > summarize
>     > > the situation. Please LMK if that explanation is clear to you.
>     > >
>     > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com>
>     > wrote:
>     > >
>     > > > Thanks, Guozhang,
>     > > >
>     > > > I was knocking myself with Kafka's various consumer rebalancing
>     > > algorithms
>     > > > in the last 2 days. Could I generalize this problem as
>     > > >
>     > > >
>     > > >
>     > > > *Any in-memory state store backed by a changelog topic will
> always risk
>     > > > having interleaved writes from two different writers during
>     > rebalancing?*
>     > > > In our case, CPU throttling made it worse as thread-2 didn't try
> to
>     > > commit
>     > > > for a long time. Also,
>     > > >
>     > > > 1. Do you think if we disable the incremental rebalancing, we
> will not
>     > > have
>     > > > this issue because If I understood correctly Thread-4 will not
> start
>     > > > processing until the state is completely transferred from
> Thread-2.
>     > > > 2. If yes, how can we disable it without downgrading the client?
>     > > >
>     > > > Since we have a very low scale and no real-time computing
> requirement,
>     > we
>     > > > will be happy to sacrifice the availability to have consistency.
>     > > >
>     > > > Regards,
>     > > > Mangat
>     > > >
>     > > >
>     > > >
>     > > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <
> wangguoz@gmail.com>
>     > > wrote:
>     > > >
>     > > > > Hi Mangat:
>     > > > >
>     > > > > I think I found the issue of your problem here.
>     > > > >
>     > > > > It seems thread-2's partition was assigned to thread-4 while
> thread-2
>     > > was
>     > > > > not aware (because it missed a rebalance, this is normal
> scenario);
>     > in
>     > > > > other words, thread2 becomes a "zombie". It would stay in that
> zombie
>     > > > state
>     > > > > until it tried to commit, in which it would get an error from
> the
>     > > brokers
>     > > > > and realize its zombie identity and re-joins the group.
>     > > > >
>     > > > > During that period of time, before the commit was issued, it
> would
>     > > > continue
>     > > > > trying to write to its local states; here are several
> scenarios:
>     > > > >
>     > > > > 1) if thread-2/4 are belonging to two different nodes then
> that is
>     > > fine,
>     > > > > since they will write to different local state stores.
>     > > > > 2) if they belong to the same nodes, and
>     > > > >    a) the state stores are persistent then they would have
> risks of
>     > > > > contention; this is guarded by the state directory locks (as
> file
>     > > locks)
>     > > > in
>     > > > > which case the new owner thread-4 should not be able to get on
> the
>     > > local
>     > > > > state files.
>     > > > >    b) the state stores are in-memory, in which case that is
> fine
>     > since
>     > > > the
>     > > > > in-memory stores are kept separate as well.
>     > > > >
>     > > > > In your case: 2.b), the issue is that the changelog would
> still be
>     > > shared
>     > > > > between the two --- but note that this is the same case as in
> case 1)
>     > > as
>     > > > > well. And this means at that time the changelog is shared by
> two
>     > > writers
>     > > > > sending records interleaving. And if there’s a tombstone that
> was
>     > > > intended
>     > > > > for a record A, but when it was written interleaving and
> there’s
>     > > another
>     > > > > record B in between, that tombstone would effectively delete
> record
>     > B.
>     > > > The
>     > > > > key here is that, when we replay the changelogs, we replay it
>     > > completely
>     > > > > following offset ordering.
>     > > > >
>     > > > >
>     > > > >
>     > > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <
> mangatmodi@gmail.com>
>     > > wrote:
>     > > > >
>     > > > > > Guozhang,
>     > > > > >
>     > > > > > Yes, you are correct. We have our own group processor. I
> have more
>     > > > > > information now.
>     > > > > >
>     > > > > > 1. I added ThreadId in the data when the app persists into
> the
>     > > > changelog
>     > > > > > topic.
>     > > > > > 2. Thread-2 which was working with partition-0 had a timeout
> issue.
>     > > > > > 4. Thread-4 picked up this partition-0 as I can see its Id
> in the
>     > > > > > changelog.
>     > > > > > 5. *But then Thread-2 and Thread-4 both were writing into the
>     > > > partition-0
>     > > > > > of the changelog, that too for the same key.*
>     > > > > >
>     > > > > > So I was clearly able to see that two threads were
> overwriting data
>     > > of
>     > > > > one
>     > > > > > another into the state store leading to a corrupted state.
> This
>     > > > confirms
>     > > > > my
>     > > > > > theory that it was an issue of concurrent update. This was
>     > something
>     > > > > > totally unexpected. I suspect that Thread-2 continues to
> persist
>     > its
>     > > > > > in-memory state, maybe because It wasn't stopped after the
> timeout
>     > > > > > exception. Is there a configuration possible in the Kafka
> stream
>     > > which
>     > > > > > could lead to this?
>     > > > > >
>     > > > > > There was no network issue, our CPU was highly throttled by
>     > > Kubernetes.
>     > > > > We
>     > > > > > gave more resources, also decreased the fetch-size so we
> have more
>     > > I/O
>     > > > to
>     > > > > > Cpu time ratio than before, and then there was no timeout
> issue,
>     > > hence
>     > > > no
>     > > > > > reassignment and hence no corrupted state.
>     > > > > >
>     > > > > > I really appreciate your help here...
>     > > > > > Thanks!
>     > > > > > Mangat
>     > > > > >
>     > > > > >
>     > > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <
> wangguoz@gmail.com>
>     > > > > wrote:
>     > > > > >
>     > > > > > > Hey Mangat,
>     > > > > > >
>     > > > > > > A. With at least once, Streams does not make sure
> atomicity of
>     > 1) /
>     > > > 2);
>     > > > > > > with exactly once, atomicity is indeed guaranteed with
>     > > transactional
>     > > > > > > messaging.
>     > > > > > >
>     > > > > > > B. If you are using processor API, then I'm assuming you
> did your
>     > > own
>     > > > > > > group-by processor right? In that case, the partition key
> would
>     > > just
>     > > > be
>     > > > > > the
>     > > > > > > record key when you are sending to the repartition topic.
>     > > > > > >
>     > > > > > >
>     > > > > > > Guozhang
>     > > > > > >
>     > > > > > >
>     > > > > > >
>     > > > > > >
>     > > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <
> mangatmodi@gmail.com>
>     > > > > wrote:
>     > > > > > >
>     > > > > > > > Thanks again, that makes things clear. I still have some
>     > > questions
>     > > > > here
>     > > > > > > > then.
>     > > > > > > >
>     > > > > > > > A.  For each record we read, we do two updates
>     > > > > > > >       1. Changelog topic of the state store.
>     > > > > > > >       2. Output topic aka sink.
>     > > > > > > >       Does the Kafka stream app make sure that either
> both are
>     > > > > > committed
>     > > > > > > or
>     > > > > > > > neither?
>     > > > > > > >
>     > > > > > > > B.  Out Input topic actually has the as (a,b,c), but we
>     > partition
>     > > > > with
>     > > > > > > only
>     > > > > > > > (a). We do this because we have different compaction
>     > requirements
>     > > > > than
>     > > > > > > the
>     > > > > > > > partitions. It will still work as all (a,b,c) records
> will go
>     > to
>     > > > the
>     > > > > > same
>     > > > > > > > partition. Now in aggregation, we group by (a,b,c). In
> such
>     > case
>     > > > what
>     > > > > > > will
>     > > > > > > > be the partition key for the changelog topic?
>     > > > > > > >
>     > > > > > > > Note that we use low-level processor API and don't commit
>     > > > ourselves.
>     > > > > > > >
>     > > > > > > > Regards,
>     > > > > > > > Mangat
>     > > > > > > >
>     > > > > > > >
>     > > > > > > >
>     > > > > > > >
>     > > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <
>     > wangguoz@gmail.com
>     > > >
>     > > > > > wrote:
>     > > > > > > >
>     > > > > > > > > Hi Mangat,
>     > > > > > > > >
>     > > > > > > > > Please see my replies inline below.
>     > > > > > > > >
>     > > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
>     > > mangatmodi@gmail.com>
>     > > > > > > wrote:
>     > > > > > > > >
>     > > > > > > > > > @Guozhang Wang
>     > > > > > > > > >
>     > > > > > > > > > Thanks for the reply.  Indeed I am finding it
> difficult to
>     > > > > explain
>     > > > > > > this
>     > > > > > > > > > state. I checked the code many times. There can be a
> bug
>     > but
>     > > I
>     > > > > fail
>     > > > > > > to
>     > > > > > > > > see
>     > > > > > > > > > it. There are several things about the Kafka streams
> that I
>     > > > don't
>     > > > > > > > > > understand, which makes it harder for me to reason.
>     > > > > > > > > >
>     > > > > > > > > > 1. What is the partition key for the changelog
> topics? Is
>     > it
>     > > > the
>     > > > > > same
>     > > > > > > > as
>     > > > > > > > > > the Input key or the state store key? Or maybe the
> thread
>     > > > > specifies
>     > > > > > > the
>     > > > > > > > > > partition as it knows the input partition it is
> subscribed
>     > > to?
>     > > > If
>     > > > > > the
>     > > > > > > > > input
>     > > > > > > > > > topic and state store are differently partitioned
> then we
>     > can
>     > > > > > explain
>     > > > > > > > the
>     > > > > > > > > > issue here.
>     > > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > In Kafka Stream's changelog, the "partition key" of
> Kafka
>     > > > messages
>     > > > > is
>     > > > > > > the
>     > > > > > > > > same as the "message key" itself. And the message key
> is the
>     > > same
>     > > > > as
>     > > > > > > the
>     > > > > > > > > state store key.
>     > > > > > > > >
>     > > > > > > > > Since the state store here should be storing the
> running
>     > > > aggregate,
>     > > > > > it
>     > > > > > > > > means that the partition key is the same as the
> aggregated
>     > key.
>     > > > > > > > >
>     > > > > > > > > If you are doing a group-by aggregation here, where the
>     > > group-by
>     > > > > keys
>     > > > > > > are
>     > > > > > > > > different from the source input topic's keys, hence
> the state
>     > > > store
>     > > > > > > keys
>     > > > > > > > > would be different with the input topic keys.
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > > 2. Is there a background thread to persist in the
> state
>     > store
>     > > > > when
>     > > > > > > > > caching
>     > > > > > > > > > is disabled? When will the app commit the log for
> the input
>     > > > > topic?
>     > > > > > Is
>     > > > > > > > it
>     > > > > > > > > > when sink writes into the output topic or when the
> state
>     > > store
>     > > > > > writes
>     > > > > > > > > into
>     > > > > > > > > > the changelog topic? Because, if the app commits the
> record
>     > > > > before
>     > > > > > > the
>     > > > > > > > > data
>     > > > > > > > > > was written to changelog topic then we can again
> explain
>     > this
>     > > > > state
>     > > > > > > > > >
>     > > > > > > > > > The commit happens *after* the local state store, as
> well
>     > as
>     > > > the
>     > > > > > > > > changelog records sent by the Streams' producers, have
> been
>     > > > > flushed.
>     > > > > > > I.e.
>     > > > > > > > > if there's a failure in between, you would re-process
> some
>     > > source
>     > > > > > > records
>     > > > > > > > > and hence cause duplicates, but no data loss (a.k.a.
> the
>     > > > > > at_least_once
>     > > > > > > > > semantics).
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > > >You may also consider upgrading to 2.6.x or higher
> version
>     > > and
>     > > > > see
>     > > > > > > if
>     > > > > > > > > this
>     > > > > > > > > > issue goes away.
>     > > > > > > > > > Do you mean the client or the Kafka broker? I will be
>     > > upgrading
>     > > > > the
>     > > > > > > > > client
>     > > > > > > > > > to 2.7.0 soon.
>     > > > > > > > > >
>     > > > > > > > > > I meant the client.
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > > Sadly looking into the timestamp will not help much
> as we
>     > use
>     > > > > some
>     > > > > > > > > business
>     > > > > > > > > > time field to set the record timestamp. If I am
> right,
>     > there
>     > > is
>     > > > > no
>     > > > > > > way
>     > > > > > > > > now
>     > > > > > > > > > to know that when a Producer wrote a record in a
> Kafka
>     > topic.
>     > > > > > > > > >
>     > > > > > > > > > Regards,
>     > > > > > > > > > Mangat
>     > > > > > > > > >
>     > > > > > > > > >
>     > > > > > > > > >
>     > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
>     > > > wangguoz@gmail.com
>     > > > > >
>     > > > > > > > wrote:
>     > > > > > > > > >
>     > > > > > > > > > > Hello Mangat,
>     > > > > > > > > > >
>     > > > > > > > > > > With at least once, although some records maybe
> processed
>     > > > > > multiple
>     > > > > > > > > times
>     > > > > > > > > > > their process ordering should not be violated, so
> what
>     > you
>     > > > > > observed
>     > > > > > > > is
>     > > > > > > > > > not
>     > > > > > > > > > > expected. What caught my eyes are this section in
> your
>     > > output
>     > > > > > > > > changelogs
>     > > > > > > > > > > (high-lighted):
>     > > > > > > > > > >
>     > > > > > > > > > > Key1, V1
>     > > > > > > > > > > Key1, null
>     > > > > > > > > > > Key1, V1
>     > > > > > > > > > > Key1, null  (processed again)
>     > > > > > > > > > > Key1, V2
>     > > > > > > > > > > Key1, null
>     > > > > > > > > > >
>     > > > > > > > > > > *Key1, V1Key1,V2*
>     > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
> tombstone yet
>     > but
>     > > > > > > > reprocessed
>     > > > > > > > > > V1
>     > > > > > > > > > > again due to reassignment)
>     > > > > > > > > > >
>     > > > > > > > > > > They seem to be the result of first receiving a
> tombstone
>     > > > which
>     > > > > > > > removes
>     > > > > > > > > > V1
>     > > > > > > > > > > and then a new record that adds V2. However, since
>     > caching
>     > > is
>     > > > > > > > disabled
>     > > > > > > > > > you
>     > > > > > > > > > > should get
>     > > > > > > > > > >
>     > > > > > > > > > > *Key1,V1*
>     > > > > > > > > > > *Key1,null*
>     > > > > > > > > > > *Key1,V2*
>     > > > > > > > > > >
>     > > > > > > > > > > instead; without the actual code snippet I cannot
> tell
>     > more
>     > > > > > what's
>     > > > > > > > > > > happening here. If you can look into the logs you
> can
>     > > record
>     > > > > each
>     > > > > > > > time
>     > > > > > > > > > when
>     > > > > > > > > > > partition migrates, how many records from the
> changelog
>     > was
>     > > > > > > replayed
>     > > > > > > > to
>     > > > > > > > > > > restore the store, and from which offset on the
> input
>     > topic
>     > > > > does
>     > > > > > > > > Streams
>     > > > > > > > > > > resume processing. You may also consider upgrading
> to
>     > 2.6.x
>     > > > or
>     > > > > > > higher
>     > > > > > > > > > > version and see if this issue goes away.
>     > > > > > > > > > >
>     > > > > > > > > > >
>     > > > > > > > > > > Guozhang
>     > > > > > > > > > >
>     > > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
>     > > > > mangatmodi@gmail.com>
>     > > > > > > > > wrote:
>     > > > > > > > > > >
>     > > > > > > > > > > > Hey,
>     > > > > > > > > > > >
>     > > > > > > > > > > > We have the following setup in our
> infrastructure.
>     > > > > > > > > > > >
>     > > > > > > > > > > >    1. Kafka - 2.5.1
>     > > > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka`
> version
>     > > > 2.5.1
>     > > > > > > > library
>     > > > > > > > > > > >    3. Low level processor API is used with
>     > *atleast-once*
>     > > > > > > semantics
>     > > > > > > > > > > >    4. State stores are *in-memory* with *caching
>     > > disabled*
>     > > > > and
>     > > > > > > > > > *changelog
>     > > > > > > > > > > >    enabled*
>     > > > > > > > > > > >
>     > > > > > > > > > > >
>     > > > > > > > > > > > Is it possible that during state replication and
>     > > partition
>     > > > > > > > > > reassignment,
>     > > > > > > > > > > > the input data is not always applied to the state
>     > store?
>     > > > > > > > > > > >
>     > > > > > > > > > > > 1. Let's say the input topic is having records
> like
>     > > > following
>     > > > > > > > > > > >
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null (tombstone)
>     > > > > > > > > > > > Key1, V2
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V3
>     > > > > > > > > > > > Key1, V4
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > 2. The app has an aggregation function which
> takes
>     > these
>     > > > > record
>     > > > > > > and
>     > > > > > > > > > > update
>     > > > > > > > > > > > the state store so that changelog shall be
>     > > > > > > > > > > >
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null (tombstone)
>     > > > > > > > > > > > Key1, V2
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V3
>     > > > > > > > > > > > Key1, V3 + V4
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Let's say the partition responsible for
> processing the
>     > > > above
>     > > > > > key
>     > > > > > > > was
>     > > > > > > > > > > > several times reallocated to different threads
> due to
>     > > some
>     > > > > > infra
>     > > > > > > > > issues
>     > > > > > > > > > > we
>     > > > > > > > > > > > are having(in Kubernetes where we run the app,
> not the
>     > > > Kafka
>     > > > > > > > > cluster).
>     > > > > > > > > > > >
>     > > > > > > > > > > > I see the following record in the changelogs
>     > > > > > > > > > > >
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null  (processed again)
>     > > > > > > > > > > > Key1, V2
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1,V2
>     > > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
> tombstone yet
>     > > but
>     > > > > > > > > reprocessed
>     > > > > > > > > > > V1
>     > > > > > > > > > > > again due to reassignment)
>     > > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone,
> but then
>     > V1
>     > > > > > > tombstone
>     > > > > > > > > > > should
>     > > > > > > > > > > > have been applied also!!)
>     > > > > > > > > > > > Key1, V2+V1 (it is back!!!)
>     > > > > > > > > > > > Key1,V1
>     > > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
>     > > > > > > > > > > > ```
>     > > > > > > > > > > >
>     > > > > > > > > > > > If you see this means several things
>     > > > > > > > > > > > 1. The state is always correctly applied locally
> (in
>     > > > > developer
>     > > > > > > > > laptop),
>     > > > > > > > > > > > where there were no reassignments.
>     > > > > > > > > > > > 2. The records are processed multiple times,
> which is
>     > > > > > > > understandable
>     > > > > > > > > as
>     > > > > > > > > > > we
>     > > > > > > > > > > > have at least symantics here.
>     > > > > > > > > > > > 3. As long as we re-apply the same events in the
> same
>     > > > orders
>     > > > > we
>     > > > > > > are
>     > > > > > > > > > > golden
>     > > > > > > > > > > > but looks like some records are skipped, but
> here it
>     > > looks
>     > > > as
>     > > > > > if
>     > > > > > > we
>     > > > > > > > > > have
>     > > > > > > > > > > > multiple consumers reading and update the same
> topics,
>     > > > > leading
>     > > > > > to
>     > > > > > > > > race
>     > > > > > > > > > > > conditions.
>     > > > > > > > > > > >
>     > > > > > > > > > > > Is there any way, Kafka streams' state
> replication
>     > could
>     > > > lead
>     > > > > > to
>     > > > > > > > > such a
>     > > > > > > > > > > > race condition?
>     > > > > > > > > > > >
>     > > > > > > > > > > > Regards,
>     > > > > > > > > > > > Mangat
>     > > > > > > > > > > >
>     > > > > > > > > > >
>     > > > > > > > > > >
>     > > > > > > > > > > --
>     > > > > > > > > > > -- Guozhang
>     > > > > > > > > > >
>     > > > > > > > > >
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > --
>     > > > > > > > > -- Guozhang
>     > > > > > > > >
>     > > > > > > >
>     > > > > > >
>     > > > > > >
>     > > > > > > --
>     > > > > > > -- Guozhang
>     > > > > > >
>     > > > > >
>     > > > >
>     > > > >
>     > > > > --
>     > > > > -- Guozhang
>     > > > >
>     > > >
>     > >
>     > >
>     > > --
>     > > -- Guozhang
>     > >
>     >
>
>
>     --
>     -- Guozhang
>
>
>

-- 
-- Guozhang

Re: Kafka Stream: State replication seems unpredictable.

Posted by "Parthasarathy, Mohan" <mp...@hpe.com>.
Guozhang,

Thanks for the clarification. It makes sense. As long as the output hits the broker, zombies can be detected.

-mohan

On 5/3/21, 2:17 PM, "Guozhang Wang" <wa...@gmail.com> wrote:

    Hello Mohan,
    
    Sorry for getting late on the thread. Just to revive your concerns here: if
    in your topology there's no output at all to any topics (sink topics,
    changelog topics), then yes the zombie would not be detected; but on the
    other hand the topology itself is not make any visible changes to the
    external systems anyways -- you can just think of a zombie who's keeping
    doing redundant work and then drop the results on the floor since nothing
    was reflected outside.
    
    On the other hand, if the tasks are at least writing to some sink topics,
    then zombies would still be detected.
    
    
    Guozhang
    
    On Thu, Apr 22, 2021 at 10:47 AM Parthasarathy, Mohan <mp...@hpe.com>
    wrote:
    
    > Guozhang,
    >
    > What does this mean if the changelog topic was disabled ? If thread 2 and
    > thread 4 are running in two different nodes and a rebalance occurs, thread
    > 2 will not realize it is a zombie without the write to the changelog topic,
    > right ? I am trying to understand the cases under which the changelog topic
    > can ever be disabled.
    >
    > Thanks
    > Mohan
    >
    >
    > On 4/21/21, 10:22 PM, "Guozhang Wang" <wa...@gmail.com> wrote:
    >
    >     Hello Mangat,
    >
    >     I think using persistent store that relies on in-memory stores could
    > help
    >     if the threads are from the same instance.
    >
    >     Guozhang
    >
    >     On Tue, Apr 20, 2021 at 12:54 AM mangat rai <ma...@gmail.com>
    > wrote:
    >
    >     > Hey Guozhang,
    >     >
    >     > Thanks for creating the issue. Yes, you are right, this will happen
    > only
    >     > with the consecutive rebalancing as after some time zombie thread
    > will stop
    >     > and re-join the group and the new thread will always overwrite the
    > state
    >     > with the latest data. In our poor infra setup, the rebalancing was
    >     > happening many times in a row.
    >     >
    >     > Now, we can't guarantee that the consecutive rebalancing will not
    > happen
    >     > again (we reduced fetch-size which fixed it in many ways), will any
    > of the
    >     > following work as a workaround?
    >     >
    >     > 1. Use persistent store instead of in-memory. The new thread will
    > never get
    >     > the lock hence we will lose availability but keep things consistent.
    >     > 2. Use exactly-once semantics. However, we might need to redesign
    > our apps.
    >     > It's a bigger change.
    >     >
    >     > Regards,
    >     > Mangat
    >     >
    >     > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wa...@gmail.com>
    > wrote:
    >     >
    >     > > Hello Mangat,
    >     > >
    >     > > What you've encountered is a "zombie writer" issue, that is,
    > Thread-2 did
    >     > > not know there's already a new rebalance and hence its partitions
    > have
    >     > been
    >     > > migrated out, until it tries to commit and then got notified of the
    >     > > illegal-generation error and realize itself is the "zombie"
    > already. This
    >     > > case would still persist even with incremental rebalancing.
    >     > >
    >     > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693   to
    >     > summarize
    >     > > the situation. Please LMK if that explanation is clear to you.
    >     > >
    >     > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com>
    >     > wrote:
    >     > >
    >     > > > Thanks, Guozhang,
    >     > > >
    >     > > > I was knocking myself with Kafka's various consumer rebalancing
    >     > > algorithms
    >     > > > in the last 2 days. Could I generalize this problem as
    >     > > >
    >     > > >
    >     > > >
    >     > > > *Any in-memory state store backed by a changelog topic will
    > always risk
    >     > > > having interleaved writes from two different writers during
    >     > rebalancing?*
    >     > > > In our case, CPU throttling made it worse as thread-2 didn't try
    > to
    >     > > commit
    >     > > > for a long time. Also,
    >     > > >
    >     > > > 1. Do you think if we disable the incremental rebalancing, we
    > will not
    >     > > have
    >     > > > this issue because If I understood correctly Thread-4 will not
    > start
    >     > > > processing until the state is completely transferred from
    > Thread-2.
    >     > > > 2. If yes, how can we disable it without downgrading the client?
    >     > > >
    >     > > > Since we have a very low scale and no real-time computing
    > requirement,
    >     > we
    >     > > > will be happy to sacrifice the availability to have consistency.
    >     > > >
    >     > > > Regards,
    >     > > > Mangat
    >     > > >
    >     > > >
    >     > > >
    >     > > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <
    > wangguoz@gmail.com>
    >     > > wrote:
    >     > > >
    >     > > > > Hi Mangat:
    >     > > > >
    >     > > > > I think I found the issue of your problem here.
    >     > > > >
    >     > > > > It seems thread-2's partition was assigned to thread-4 while
    > thread-2
    >     > > was
    >     > > > > not aware (because it missed a rebalance, this is normal
    > scenario);
    >     > in
    >     > > > > other words, thread2 becomes a "zombie". It would stay in that
    > zombie
    >     > > > state
    >     > > > > until it tried to commit, in which it would get an error from
    > the
    >     > > brokers
    >     > > > > and realize its zombie identity and re-joins the group.
    >     > > > >
    >     > > > > During that period of time, before the commit was issued, it
    > would
    >     > > > continue
    >     > > > > trying to write to its local states; here are several
    > scenarios:
    >     > > > >
    >     > > > > 1) if thread-2/4 are belonging to two different nodes then
    > that is
    >     > > fine,
    >     > > > > since they will write to different local state stores.
    >     > > > > 2) if they belong to the same nodes, and
    >     > > > >    a) the state stores are persistent then they would have
    > risks of
    >     > > > > contention; this is guarded by the state directory locks (as
    > file
    >     > > locks)
    >     > > > in
    >     > > > > which case the new owner thread-4 should not be able to get on
    > the
    >     > > local
    >     > > > > state files.
    >     > > > >    b) the state stores are in-memory, in which case that is
    > fine
    >     > since
    >     > > > the
    >     > > > > in-memory stores are kept separate as well.
    >     > > > >
    >     > > > > In your case: 2.b), the issue is that the changelog would
    > still be
    >     > > shared
    >     > > > > between the two --- but note that this is the same case as in
    > case 1)
    >     > > as
    >     > > > > well. And this means at that time the changelog is shared by
    > two
    >     > > writers
    >     > > > > sending records interleaving. And if there’s a tombstone that
    > was
    >     > > > intended
    >     > > > > for a record A, but when it was written interleaving and
    > there’s
    >     > > another
    >     > > > > record B in between, that tombstone would effectively delete
    > record
    >     > B.
    >     > > > The
    >     > > > > key here is that, when we replay the changelogs, we replay it
    >     > > completely
    >     > > > > following offset ordering.
    >     > > > >
    >     > > > >
    >     > > > >
    >     > > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <
    > mangatmodi@gmail.com>
    >     > > wrote:
    >     > > > >
    >     > > > > > Guozhang,
    >     > > > > >
    >     > > > > > Yes, you are correct. We have our own group processor. I
    > have more
    >     > > > > > information now.
    >     > > > > >
    >     > > > > > 1. I added ThreadId in the data when the app persists into
    > the
    >     > > > changelog
    >     > > > > > topic.
    >     > > > > > 2. Thread-2 which was working with partition-0 had a timeout
    > issue.
    >     > > > > > 4. Thread-4 picked up this partition-0 as I can see its Id
    > in the
    >     > > > > > changelog.
    >     > > > > > 5. *But then Thread-2 and Thread-4 both were writing into the
    >     > > > partition-0
    >     > > > > > of the changelog, that too for the same key.*
    >     > > > > >
    >     > > > > > So I was clearly able to see that two threads were
    > overwriting data
    >     > > of
    >     > > > > one
    >     > > > > > another into the state store leading to a corrupted state.
    > This
    >     > > > confirms
    >     > > > > my
    >     > > > > > theory that it was an issue of concurrent update. This was
    >     > something
    >     > > > > > totally unexpected. I suspect that Thread-2 continues to
    > persist
    >     > its
    >     > > > > > in-memory state, maybe because It wasn't stopped after the
    > timeout
    >     > > > > > exception. Is there a configuration possible in the Kafka
    > stream
    >     > > which
    >     > > > > > could lead to this?
    >     > > > > >
    >     > > > > > There was no network issue, our CPU was highly throttled by
    >     > > Kubernetes.
    >     > > > > We
    >     > > > > > gave more resources, also decreased the fetch-size so we
    > have more
    >     > > I/O
    >     > > > to
    >     > > > > > Cpu time ratio than before, and then there was no timeout
    > issue,
    >     > > hence
    >     > > > no
    >     > > > > > reassignment and hence no corrupted state.
    >     > > > > >
    >     > > > > > I really appreciate your help here...
    >     > > > > > Thanks!
    >     > > > > > Mangat
    >     > > > > >
    >     > > > > >
    >     > > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <
    > wangguoz@gmail.com>
    >     > > > > wrote:
    >     > > > > >
    >     > > > > > > Hey Mangat,
    >     > > > > > >
    >     > > > > > > A. With at least once, Streams does not make sure
    > atomicity of
    >     > 1) /
    >     > > > 2);
    >     > > > > > > with exactly once, atomicity is indeed guaranteed with
    >     > > transactional
    >     > > > > > > messaging.
    >     > > > > > >
    >     > > > > > > B. If you are using processor API, then I'm assuming you
    > did your
    >     > > own
    >     > > > > > > group-by processor right? In that case, the partition key
    > would
    >     > > just
    >     > > > be
    >     > > > > > the
    >     > > > > > > record key when you are sending to the repartition topic.
    >     > > > > > >
    >     > > > > > >
    >     > > > > > > Guozhang
    >     > > > > > >
    >     > > > > > >
    >     > > > > > >
    >     > > > > > >
    >     > > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <
    > mangatmodi@gmail.com>
    >     > > > > wrote:
    >     > > > > > >
    >     > > > > > > > Thanks again, that makes things clear. I still have some
    >     > > questions
    >     > > > > here
    >     > > > > > > > then.
    >     > > > > > > >
    >     > > > > > > > A.  For each record we read, we do two updates
    >     > > > > > > >       1. Changelog topic of the state store.
    >     > > > > > > >       2. Output topic aka sink.
    >     > > > > > > >       Does the Kafka stream app make sure that either
    > both are
    >     > > > > > committed
    >     > > > > > > or
    >     > > > > > > > neither?
    >     > > > > > > >
    >     > > > > > > > B.  Out Input topic actually has the as (a,b,c), but we
    >     > partition
    >     > > > > with
    >     > > > > > > only
    >     > > > > > > > (a). We do this because we have different compaction
    >     > requirements
    >     > > > > than
    >     > > > > > > the
    >     > > > > > > > partitions. It will still work as all (a,b,c) records
    > will go
    >     > to
    >     > > > the
    >     > > > > > same
    >     > > > > > > > partition. Now in aggregation, we group by (a,b,c). In
    > such
    >     > case
    >     > > > what
    >     > > > > > > will
    >     > > > > > > > be the partition key for the changelog topic?
    >     > > > > > > >
    >     > > > > > > > Note that we use low-level processor API and don't commit
    >     > > > ourselves.
    >     > > > > > > >
    >     > > > > > > > Regards,
    >     > > > > > > > Mangat
    >     > > > > > > >
    >     > > > > > > >
    >     > > > > > > >
    >     > > > > > > >
    >     > > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <
    >     > wangguoz@gmail.com
    >     > > >
    >     > > > > > wrote:
    >     > > > > > > >
    >     > > > > > > > > Hi Mangat,
    >     > > > > > > > >
    >     > > > > > > > > Please see my replies inline below.
    >     > > > > > > > >
    >     > > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
    >     > > mangatmodi@gmail.com>
    >     > > > > > > wrote:
    >     > > > > > > > >
    >     > > > > > > > > > @Guozhang Wang
    >     > > > > > > > > >
    >     > > > > > > > > > Thanks for the reply.  Indeed I am finding it
    > difficult to
    >     > > > > explain
    >     > > > > > > this
    >     > > > > > > > > > state. I checked the code many times. There can be a
    > bug
    >     > but
    >     > > I
    >     > > > > fail
    >     > > > > > > to
    >     > > > > > > > > see
    >     > > > > > > > > > it. There are several things about the Kafka streams
    > that I
    >     > > > don't
    >     > > > > > > > > > understand, which makes it harder for me to reason.
    >     > > > > > > > > >
    >     > > > > > > > > > 1. What is the partition key for the changelog
    > topics? Is
    >     > it
    >     > > > the
    >     > > > > > same
    >     > > > > > > > as
    >     > > > > > > > > > the Input key or the state store key? Or maybe the
    > thread
    >     > > > > specifies
    >     > > > > > > the
    >     > > > > > > > > > partition as it knows the input partition it is
    > subscribed
    >     > > to?
    >     > > > If
    >     > > > > > the
    >     > > > > > > > > input
    >     > > > > > > > > > topic and state store are differently partitioned
    > then we
    >     > can
    >     > > > > > explain
    >     > > > > > > > the
    >     > > > > > > > > > issue here.
    >     > > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > In Kafka Stream's changelog, the "partition key" of
    > Kafka
    >     > > > messages
    >     > > > > is
    >     > > > > > > the
    >     > > > > > > > > same as the "message key" itself. And the message key
    > is the
    >     > > same
    >     > > > > as
    >     > > > > > > the
    >     > > > > > > > > state store key.
    >     > > > > > > > >
    >     > > > > > > > > Since the state store here should be storing the
    > running
    >     > > > aggregate,
    >     > > > > > it
    >     > > > > > > > > means that the partition key is the same as the
    > aggregated
    >     > key.
    >     > > > > > > > >
    >     > > > > > > > > If you are doing a group-by aggregation here, where the
    >     > > group-by
    >     > > > > keys
    >     > > > > > > are
    >     > > > > > > > > different from the source input topic's keys, hence
    > the state
    >     > > > store
    >     > > > > > > keys
    >     > > > > > > > > would be different with the input topic keys.
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > > 2. Is there a background thread to persist in the
    > state
    >     > store
    >     > > > > when
    >     > > > > > > > > caching
    >     > > > > > > > > > is disabled? When will the app commit the log for
    > the input
    >     > > > > topic?
    >     > > > > > Is
    >     > > > > > > > it
    >     > > > > > > > > > when sink writes into the output topic or when the
    > state
    >     > > store
    >     > > > > > writes
    >     > > > > > > > > into
    >     > > > > > > > > > the changelog topic? Because, if the app commits the
    > record
    >     > > > > before
    >     > > > > > > the
    >     > > > > > > > > data
    >     > > > > > > > > > was written to changelog topic then we can again
    > explain
    >     > this
    >     > > > > state
    >     > > > > > > > > >
    >     > > > > > > > > > The commit happens *after* the local state store, as
    > well
    >     > as
    >     > > > the
    >     > > > > > > > > changelog records sent by the Streams' producers, have
    > been
    >     > > > > flushed.
    >     > > > > > > I.e.
    >     > > > > > > > > if there's a failure in between, you would re-process
    > some
    >     > > source
    >     > > > > > > records
    >     > > > > > > > > and hence cause duplicates, but no data loss (a.k.a.
    > the
    >     > > > > > at_least_once
    >     > > > > > > > > semantics).
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > > >You may also consider upgrading to 2.6.x or higher
    > version
    >     > > and
    >     > > > > see
    >     > > > > > > if
    >     > > > > > > > > this
    >     > > > > > > > > > issue goes away.
    >     > > > > > > > > > Do you mean the client or the Kafka broker? I will be
    >     > > upgrading
    >     > > > > the
    >     > > > > > > > > client
    >     > > > > > > > > > to 2.7.0 soon.
    >     > > > > > > > > >
    >     > > > > > > > > > I meant the client.
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > > Sadly looking into the timestamp will not help much
    > as we
    >     > use
    >     > > > > some
    >     > > > > > > > > business
    >     > > > > > > > > > time field to set the record timestamp. If I am
    > right,
    >     > there
    >     > > is
    >     > > > > no
    >     > > > > > > way
    >     > > > > > > > > now
    >     > > > > > > > > > to know that when a Producer wrote a record in a
    > Kafka
    >     > topic.
    >     > > > > > > > > >
    >     > > > > > > > > > Regards,
    >     > > > > > > > > > Mangat
    >     > > > > > > > > >
    >     > > > > > > > > >
    >     > > > > > > > > >
    >     > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
    >     > > > wangguoz@gmail.com
    >     > > > > >
    >     > > > > > > > wrote:
    >     > > > > > > > > >
    >     > > > > > > > > > > Hello Mangat,
    >     > > > > > > > > > >
    >     > > > > > > > > > > With at least once, although some records maybe
    > processed
    >     > > > > > multiple
    >     > > > > > > > > times
    >     > > > > > > > > > > their process ordering should not be violated, so
    > what
    >     > you
    >     > > > > > observed
    >     > > > > > > > is
    >     > > > > > > > > > not
    >     > > > > > > > > > > expected. What caught my eyes are this section in
    > your
    >     > > output
    >     > > > > > > > > changelogs
    >     > > > > > > > > > > (high-lighted):
    >     > > > > > > > > > >
    >     > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > Key1, null
    >     > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > Key1, null  (processed again)
    >     > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > Key1, null
    >     > > > > > > > > > >
    >     > > > > > > > > > > *Key1, V1Key1,V2*
    >     > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
    > tombstone yet
    >     > but
    >     > > > > > > > reprocessed
    >     > > > > > > > > > V1
    >     > > > > > > > > > > again due to reassignment)
    >     > > > > > > > > > >
    >     > > > > > > > > > > They seem to be the result of first receiving a
    > tombstone
    >     > > > which
    >     > > > > > > > removes
    >     > > > > > > > > > V1
    >     > > > > > > > > > > and then a new record that adds V2. However, since
    >     > caching
    >     > > is
    >     > > > > > > > disabled
    >     > > > > > > > > > you
    >     > > > > > > > > > > should get
    >     > > > > > > > > > >
    >     > > > > > > > > > > *Key1,V1*
    >     > > > > > > > > > > *Key1,null*
    >     > > > > > > > > > > *Key1,V2*
    >     > > > > > > > > > >
    >     > > > > > > > > > > instead; without the actual code snippet I cannot
    > tell
    >     > more
    >     > > > > > what's
    >     > > > > > > > > > > happening here. If you can look into the logs you
    > can
    >     > > record
    >     > > > > each
    >     > > > > > > > time
    >     > > > > > > > > > when
    >     > > > > > > > > > > partition migrates, how many records from the
    > changelog
    >     > was
    >     > > > > > > replayed
    >     > > > > > > > to
    >     > > > > > > > > > > restore the store, and from which offset on the
    > input
    >     > topic
    >     > > > > does
    >     > > > > > > > > Streams
    >     > > > > > > > > > > resume processing. You may also consider upgrading
    > to
    >     > 2.6.x
    >     > > > or
    >     > > > > > > higher
    >     > > > > > > > > > > version and see if this issue goes away.
    >     > > > > > > > > > >
    >     > > > > > > > > > >
    >     > > > > > > > > > > Guozhang
    >     > > > > > > > > > >
    >     > > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
    >     > > > > mangatmodi@gmail.com>
    >     > > > > > > > > wrote:
    >     > > > > > > > > > >
    >     > > > > > > > > > > > Hey,
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > We have the following setup in our
    > infrastructure.
    >     > > > > > > > > > > >
    >     > > > > > > > > > > >    1. Kafka - 2.5.1
    >     > > > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka`
    > version
    >     > > > 2.5.1
    >     > > > > > > > library
    >     > > > > > > > > > > >    3. Low level processor API is used with
    >     > *atleast-once*
    >     > > > > > > semantics
    >     > > > > > > > > > > >    4. State stores are *in-memory* with *caching
    >     > > disabled*
    >     > > > > and
    >     > > > > > > > > > *changelog
    >     > > > > > > > > > > >    enabled*
    >     > > > > > > > > > > >
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > Is it possible that during state replication and
    >     > > partition
    >     > > > > > > > > > reassignment,
    >     > > > > > > > > > > > the input data is not always applied to the state
    >     > store?
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > 1. Let's say the input topic is having records
    > like
    >     > > > following
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null (tombstone)
    >     > > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V3
    >     > > > > > > > > > > > Key1, V4
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > 2. The app has an aggregation function which
    > takes
    >     > these
    >     > > > > record
    >     > > > > > > and
    >     > > > > > > > > > > update
    >     > > > > > > > > > > > the state store so that changelog shall be
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null (tombstone)
    >     > > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V3
    >     > > > > > > > > > > > Key1, V3 + V4
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Let's say the partition responsible for
    > processing the
    >     > > > above
    >     > > > > > key
    >     > > > > > > > was
    >     > > > > > > > > > > > several times reallocated to different threads
    > due to
    >     > > some
    >     > > > > > infra
    >     > > > > > > > > issues
    >     > > > > > > > > > > we
    >     > > > > > > > > > > > are having(in Kubernetes where we run the app,
    > not the
    >     > > > Kafka
    >     > > > > > > > > cluster).
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > I see the following record in the changelogs
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null  (processed again)
    >     > > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1,V2
    >     > > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
    > tombstone yet
    >     > > but
    >     > > > > > > > > reprocessed
    >     > > > > > > > > > > V1
    >     > > > > > > > > > > > again due to reassignment)
    >     > > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone,
    > but then
    >     > V1
    >     > > > > > > tombstone
    >     > > > > > > > > > > should
    >     > > > > > > > > > > > have been applied also!!)
    >     > > > > > > > > > > > Key1, V2+V1 (it is back!!!)
    >     > > > > > > > > > > > Key1,V1
    >     > > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > If you see this means several things
    >     > > > > > > > > > > > 1. The state is always correctly applied locally
    > (in
    >     > > > > developer
    >     > > > > > > > > laptop),
    >     > > > > > > > > > > > where there were no reassignments.
    >     > > > > > > > > > > > 2. The records are processed multiple times,
    > which is
    >     > > > > > > > understandable
    >     > > > > > > > > as
    >     > > > > > > > > > > we
    >     > > > > > > > > > > > have at least symantics here.
    >     > > > > > > > > > > > 3. As long as we re-apply the same events in the
    > same
    >     > > > orders
    >     > > > > we
    >     > > > > > > are
    >     > > > > > > > > > > golden
    >     > > > > > > > > > > > but looks like some records are skipped, but
    > here it
    >     > > looks
    >     > > > as
    >     > > > > > if
    >     > > > > > > we
    >     > > > > > > > > > have
    >     > > > > > > > > > > > multiple consumers reading and update the same
    > topics,
    >     > > > > leading
    >     > > > > > to
    >     > > > > > > > > race
    >     > > > > > > > > > > > conditions.
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > Is there any way, Kafka streams' state
    > replication
    >     > could
    >     > > > lead
    >     > > > > > to
    >     > > > > > > > > such a
    >     > > > > > > > > > > > race condition?
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > Regards,
    >     > > > > > > > > > > > Mangat
    >     > > > > > > > > > > >
    >     > > > > > > > > > >
    >     > > > > > > > > > >
    >     > > > > > > > > > > --
    >     > > > > > > > > > > -- Guozhang
    >     > > > > > > > > > >
    >     > > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > --
    >     > > > > > > > > -- Guozhang
    >     > > > > > > > >
    >     > > > > > > >
    >     > > > > > >
    >     > > > > > >
    >     > > > > > > --
    >     > > > > > > -- Guozhang
    >     > > > > > >
    >     > > > > >
    >     > > > >
    >     > > > >
    >     > > > > --
    >     > > > > -- Guozhang
    >     > > > >
    >     > > >
    >     > >
    >     > >
    >     > > --
    >     > > -- Guozhang
    >     > >
    >     >
    >
    >
    >     --
    >     -- Guozhang
    >
    >
    >
    
    -- 
    -- Guozhang