You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by mangat rai <ma...@gmail.com> on 2021/04/06 15:38:05 UTC

Kafka Stream: State replication seems unpredictable.

Hey,

We have the following setup in our infrastructure.

   1. Kafka - 2.5.1
   2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
   3. Low level processor API is used with *atleast-once* semantics
   4. State stores are *in-memory* with *caching disabled* and *changelog
   enabled*


Is it possible that during state replication and partition reassignment,
the input data is not always applied to the state store?

1. Let's say the input topic is having records like following

```
Key1, V1
Key1, null (tombstone)
Key1, V2
Key1, null
Key1, V3
Key1, V4
```
2. The app has an aggregation function which takes these record and update
the state store so that changelog shall be

```
Key1, V1
Key1, null (tombstone)
Key1, V2
Key1, null
Key1, V3
Key1, V3 + V4
```
Let's say the partition responsible for processing the above key was
several times reallocated to different threads due to some infra issues we
are having(in Kubernetes where we run the app, not the Kafka cluster).

I see the following record in the changelogs

```
Key1, V1
Key1, null
Key1, V1
Key1, null  (processed again)
Key1, V2
Key1, null
Key1, V1
Key1,V2
Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
again due to reassignment)
Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone should
have been applied also!!)
Key1, V2+V1 (it is back!!!)
Key1,V1
Key1, V1 + V2 + V3 (This is the final state)!
```

If you see this means several things
1. The state is always correctly applied locally (in developer laptop),
where there were no reassignments.
2. The records are processed multiple times, which is understandable as we
have at least symantics here.
3. As long as we re-apply the same events in the same orders we are golden
but looks like some records are skipped, but here it looks as if we have
multiple consumers reading and update the same topics, leading to race
conditions.

Is there any way, Kafka streams' state replication could lead to such a
race condition?

Regards,
Mangat

Re: Kafka Stream: State replication seems unpredictable.

Posted by "Parthasarathy, Mohan" <mp...@hpe.com>.
Guozhang,

Thanks for the clarification. It makes sense. As long as the output hits the broker, zombies can be detected.

-mohan

On 5/3/21, 2:17 PM, "Guozhang Wang" <wa...@gmail.com> wrote:

    Hello Mohan,
    
    Sorry for getting late on the thread. Just to revive your concerns here: if
    in your topology there's no output at all to any topics (sink topics,
    changelog topics), then yes the zombie would not be detected; but on the
    other hand the topology itself is not make any visible changes to the
    external systems anyways -- you can just think of a zombie who's keeping
    doing redundant work and then drop the results on the floor since nothing
    was reflected outside.
    
    On the other hand, if the tasks are at least writing to some sink topics,
    then zombies would still be detected.
    
    
    Guozhang
    
    On Thu, Apr 22, 2021 at 10:47 AM Parthasarathy, Mohan <mp...@hpe.com>
    wrote:
    
    > Guozhang,
    >
    > What does this mean if the changelog topic was disabled ? If thread 2 and
    > thread 4 are running in two different nodes and a rebalance occurs, thread
    > 2 will not realize it is a zombie without the write to the changelog topic,
    > right ? I am trying to understand the cases under which the changelog topic
    > can ever be disabled.
    >
    > Thanks
    > Mohan
    >
    >
    > On 4/21/21, 10:22 PM, "Guozhang Wang" <wa...@gmail.com> wrote:
    >
    >     Hello Mangat,
    >
    >     I think using persistent store that relies on in-memory stores could
    > help
    >     if the threads are from the same instance.
    >
    >     Guozhang
    >
    >     On Tue, Apr 20, 2021 at 12:54 AM mangat rai <ma...@gmail.com>
    > wrote:
    >
    >     > Hey Guozhang,
    >     >
    >     > Thanks for creating the issue. Yes, you are right, this will happen
    > only
    >     > with the consecutive rebalancing as after some time zombie thread
    > will stop
    >     > and re-join the group and the new thread will always overwrite the
    > state
    >     > with the latest data. In our poor infra setup, the rebalancing was
    >     > happening many times in a row.
    >     >
    >     > Now, we can't guarantee that the consecutive rebalancing will not
    > happen
    >     > again (we reduced fetch-size which fixed it in many ways), will any
    > of the
    >     > following work as a workaround?
    >     >
    >     > 1. Use persistent store instead of in-memory. The new thread will
    > never get
    >     > the lock hence we will lose availability but keep things consistent.
    >     > 2. Use exactly-once semantics. However, we might need to redesign
    > our apps.
    >     > It's a bigger change.
    >     >
    >     > Regards,
    >     > Mangat
    >     >
    >     > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wa...@gmail.com>
    > wrote:
    >     >
    >     > > Hello Mangat,
    >     > >
    >     > > What you've encountered is a "zombie writer" issue, that is,
    > Thread-2 did
    >     > > not know there's already a new rebalance and hence its partitions
    > have
    >     > been
    >     > > migrated out, until it tries to commit and then got notified of the
    >     > > illegal-generation error and realize itself is the "zombie"
    > already. This
    >     > > case would still persist even with incremental rebalancing.
    >     > >
    >     > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693   to
    >     > summarize
    >     > > the situation. Please LMK if that explanation is clear to you.
    >     > >
    >     > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com>
    >     > wrote:
    >     > >
    >     > > > Thanks, Guozhang,
    >     > > >
    >     > > > I was knocking myself with Kafka's various consumer rebalancing
    >     > > algorithms
    >     > > > in the last 2 days. Could I generalize this problem as
    >     > > >
    >     > > >
    >     > > >
    >     > > > *Any in-memory state store backed by a changelog topic will
    > always risk
    >     > > > having interleaved writes from two different writers during
    >     > rebalancing?*
    >     > > > In our case, CPU throttling made it worse as thread-2 didn't try
    > to
    >     > > commit
    >     > > > for a long time. Also,
    >     > > >
    >     > > > 1. Do you think if we disable the incremental rebalancing, we
    > will not
    >     > > have
    >     > > > this issue because If I understood correctly Thread-4 will not
    > start
    >     > > > processing until the state is completely transferred from
    > Thread-2.
    >     > > > 2. If yes, how can we disable it without downgrading the client?
    >     > > >
    >     > > > Since we have a very low scale and no real-time computing
    > requirement,
    >     > we
    >     > > > will be happy to sacrifice the availability to have consistency.
    >     > > >
    >     > > > Regards,
    >     > > > Mangat
    >     > > >
    >     > > >
    >     > > >
    >     > > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <
    > wangguoz@gmail.com>
    >     > > wrote:
    >     > > >
    >     > > > > Hi Mangat:
    >     > > > >
    >     > > > > I think I found the issue of your problem here.
    >     > > > >
    >     > > > > It seems thread-2's partition was assigned to thread-4 while
    > thread-2
    >     > > was
    >     > > > > not aware (because it missed a rebalance, this is normal
    > scenario);
    >     > in
    >     > > > > other words, thread2 becomes a "zombie". It would stay in that
    > zombie
    >     > > > state
    >     > > > > until it tried to commit, in which it would get an error from
    > the
    >     > > brokers
    >     > > > > and realize its zombie identity and re-joins the group.
    >     > > > >
    >     > > > > During that period of time, before the commit was issued, it
    > would
    >     > > > continue
    >     > > > > trying to write to its local states; here are several
    > scenarios:
    >     > > > >
    >     > > > > 1) if thread-2/4 are belonging to two different nodes then
    > that is
    >     > > fine,
    >     > > > > since they will write to different local state stores.
    >     > > > > 2) if they belong to the same nodes, and
    >     > > > >    a) the state stores are persistent then they would have
    > risks of
    >     > > > > contention; this is guarded by the state directory locks (as
    > file
    >     > > locks)
    >     > > > in
    >     > > > > which case the new owner thread-4 should not be able to get on
    > the
    >     > > local
    >     > > > > state files.
    >     > > > >    b) the state stores are in-memory, in which case that is
    > fine
    >     > since
    >     > > > the
    >     > > > > in-memory stores are kept separate as well.
    >     > > > >
    >     > > > > In your case: 2.b), the issue is that the changelog would
    > still be
    >     > > shared
    >     > > > > between the two --- but note that this is the same case as in
    > case 1)
    >     > > as
    >     > > > > well. And this means at that time the changelog is shared by
    > two
    >     > > writers
    >     > > > > sending records interleaving. And if there’s a tombstone that
    > was
    >     > > > intended
    >     > > > > for a record A, but when it was written interleaving and
    > there’s
    >     > > another
    >     > > > > record B in between, that tombstone would effectively delete
    > record
    >     > B.
    >     > > > The
    >     > > > > key here is that, when we replay the changelogs, we replay it
    >     > > completely
    >     > > > > following offset ordering.
    >     > > > >
    >     > > > >
    >     > > > >
    >     > > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <
    > mangatmodi@gmail.com>
    >     > > wrote:
    >     > > > >
    >     > > > > > Guozhang,
    >     > > > > >
    >     > > > > > Yes, you are correct. We have our own group processor. I
    > have more
    >     > > > > > information now.
    >     > > > > >
    >     > > > > > 1. I added ThreadId in the data when the app persists into
    > the
    >     > > > changelog
    >     > > > > > topic.
    >     > > > > > 2. Thread-2 which was working with partition-0 had a timeout
    > issue.
    >     > > > > > 4. Thread-4 picked up this partition-0 as I can see its Id
    > in the
    >     > > > > > changelog.
    >     > > > > > 5. *But then Thread-2 and Thread-4 both were writing into the
    >     > > > partition-0
    >     > > > > > of the changelog, that too for the same key.*
    >     > > > > >
    >     > > > > > So I was clearly able to see that two threads were
    > overwriting data
    >     > > of
    >     > > > > one
    >     > > > > > another into the state store leading to a corrupted state.
    > This
    >     > > > confirms
    >     > > > > my
    >     > > > > > theory that it was an issue of concurrent update. This was
    >     > something
    >     > > > > > totally unexpected. I suspect that Thread-2 continues to
    > persist
    >     > its
    >     > > > > > in-memory state, maybe because It wasn't stopped after the
    > timeout
    >     > > > > > exception. Is there a configuration possible in the Kafka
    > stream
    >     > > which
    >     > > > > > could lead to this?
    >     > > > > >
    >     > > > > > There was no network issue, our CPU was highly throttled by
    >     > > Kubernetes.
    >     > > > > We
    >     > > > > > gave more resources, also decreased the fetch-size so we
    > have more
    >     > > I/O
    >     > > > to
    >     > > > > > Cpu time ratio than before, and then there was no timeout
    > issue,
    >     > > hence
    >     > > > no
    >     > > > > > reassignment and hence no corrupted state.
    >     > > > > >
    >     > > > > > I really appreciate your help here...
    >     > > > > > Thanks!
    >     > > > > > Mangat
    >     > > > > >
    >     > > > > >
    >     > > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <
    > wangguoz@gmail.com>
    >     > > > > wrote:
    >     > > > > >
    >     > > > > > > Hey Mangat,
    >     > > > > > >
    >     > > > > > > A. With at least once, Streams does not make sure
    > atomicity of
    >     > 1) /
    >     > > > 2);
    >     > > > > > > with exactly once, atomicity is indeed guaranteed with
    >     > > transactional
    >     > > > > > > messaging.
    >     > > > > > >
    >     > > > > > > B. If you are using processor API, then I'm assuming you
    > did your
    >     > > own
    >     > > > > > > group-by processor right? In that case, the partition key
    > would
    >     > > just
    >     > > > be
    >     > > > > > the
    >     > > > > > > record key when you are sending to the repartition topic.
    >     > > > > > >
    >     > > > > > >
    >     > > > > > > Guozhang
    >     > > > > > >
    >     > > > > > >
    >     > > > > > >
    >     > > > > > >
    >     > > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <
    > mangatmodi@gmail.com>
    >     > > > > wrote:
    >     > > > > > >
    >     > > > > > > > Thanks again, that makes things clear. I still have some
    >     > > questions
    >     > > > > here
    >     > > > > > > > then.
    >     > > > > > > >
    >     > > > > > > > A.  For each record we read, we do two updates
    >     > > > > > > >       1. Changelog topic of the state store.
    >     > > > > > > >       2. Output topic aka sink.
    >     > > > > > > >       Does the Kafka stream app make sure that either
    > both are
    >     > > > > > committed
    >     > > > > > > or
    >     > > > > > > > neither?
    >     > > > > > > >
    >     > > > > > > > B.  Out Input topic actually has the as (a,b,c), but we
    >     > partition
    >     > > > > with
    >     > > > > > > only
    >     > > > > > > > (a). We do this because we have different compaction
    >     > requirements
    >     > > > > than
    >     > > > > > > the
    >     > > > > > > > partitions. It will still work as all (a,b,c) records
    > will go
    >     > to
    >     > > > the
    >     > > > > > same
    >     > > > > > > > partition. Now in aggregation, we group by (a,b,c). In
    > such
    >     > case
    >     > > > what
    >     > > > > > > will
    >     > > > > > > > be the partition key for the changelog topic?
    >     > > > > > > >
    >     > > > > > > > Note that we use low-level processor API and don't commit
    >     > > > ourselves.
    >     > > > > > > >
    >     > > > > > > > Regards,
    >     > > > > > > > Mangat
    >     > > > > > > >
    >     > > > > > > >
    >     > > > > > > >
    >     > > > > > > >
    >     > > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <
    >     > wangguoz@gmail.com
    >     > > >
    >     > > > > > wrote:
    >     > > > > > > >
    >     > > > > > > > > Hi Mangat,
    >     > > > > > > > >
    >     > > > > > > > > Please see my replies inline below.
    >     > > > > > > > >
    >     > > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
    >     > > mangatmodi@gmail.com>
    >     > > > > > > wrote:
    >     > > > > > > > >
    >     > > > > > > > > > @Guozhang Wang
    >     > > > > > > > > >
    >     > > > > > > > > > Thanks for the reply.  Indeed I am finding it
    > difficult to
    >     > > > > explain
    >     > > > > > > this
    >     > > > > > > > > > state. I checked the code many times. There can be a
    > bug
    >     > but
    >     > > I
    >     > > > > fail
    >     > > > > > > to
    >     > > > > > > > > see
    >     > > > > > > > > > it. There are several things about the Kafka streams
    > that I
    >     > > > don't
    >     > > > > > > > > > understand, which makes it harder for me to reason.
    >     > > > > > > > > >
    >     > > > > > > > > > 1. What is the partition key for the changelog
    > topics? Is
    >     > it
    >     > > > the
    >     > > > > > same
    >     > > > > > > > as
    >     > > > > > > > > > the Input key or the state store key? Or maybe the
    > thread
    >     > > > > specifies
    >     > > > > > > the
    >     > > > > > > > > > partition as it knows the input partition it is
    > subscribed
    >     > > to?
    >     > > > If
    >     > > > > > the
    >     > > > > > > > > input
    >     > > > > > > > > > topic and state store are differently partitioned
    > then we
    >     > can
    >     > > > > > explain
    >     > > > > > > > the
    >     > > > > > > > > > issue here.
    >     > > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > In Kafka Stream's changelog, the "partition key" of
    > Kafka
    >     > > > messages
    >     > > > > is
    >     > > > > > > the
    >     > > > > > > > > same as the "message key" itself. And the message key
    > is the
    >     > > same
    >     > > > > as
    >     > > > > > > the
    >     > > > > > > > > state store key.
    >     > > > > > > > >
    >     > > > > > > > > Since the state store here should be storing the
    > running
    >     > > > aggregate,
    >     > > > > > it
    >     > > > > > > > > means that the partition key is the same as the
    > aggregated
    >     > key.
    >     > > > > > > > >
    >     > > > > > > > > If you are doing a group-by aggregation here, where the
    >     > > group-by
    >     > > > > keys
    >     > > > > > > are
    >     > > > > > > > > different from the source input topic's keys, hence
    > the state
    >     > > > store
    >     > > > > > > keys
    >     > > > > > > > > would be different with the input topic keys.
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > > 2. Is there a background thread to persist in the
    > state
    >     > store
    >     > > > > when
    >     > > > > > > > > caching
    >     > > > > > > > > > is disabled? When will the app commit the log for
    > the input
    >     > > > > topic?
    >     > > > > > Is
    >     > > > > > > > it
    >     > > > > > > > > > when sink writes into the output topic or when the
    > state
    >     > > store
    >     > > > > > writes
    >     > > > > > > > > into
    >     > > > > > > > > > the changelog topic? Because, if the app commits the
    > record
    >     > > > > before
    >     > > > > > > the
    >     > > > > > > > > data
    >     > > > > > > > > > was written to changelog topic then we can again
    > explain
    >     > this
    >     > > > > state
    >     > > > > > > > > >
    >     > > > > > > > > > The commit happens *after* the local state store, as
    > well
    >     > as
    >     > > > the
    >     > > > > > > > > changelog records sent by the Streams' producers, have
    > been
    >     > > > > flushed.
    >     > > > > > > I.e.
    >     > > > > > > > > if there's a failure in between, you would re-process
    > some
    >     > > source
    >     > > > > > > records
    >     > > > > > > > > and hence cause duplicates, but no data loss (a.k.a.
    > the
    >     > > > > > at_least_once
    >     > > > > > > > > semantics).
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > > >You may also consider upgrading to 2.6.x or higher
    > version
    >     > > and
    >     > > > > see
    >     > > > > > > if
    >     > > > > > > > > this
    >     > > > > > > > > > issue goes away.
    >     > > > > > > > > > Do you mean the client or the Kafka broker? I will be
    >     > > upgrading
    >     > > > > the
    >     > > > > > > > > client
    >     > > > > > > > > > to 2.7.0 soon.
    >     > > > > > > > > >
    >     > > > > > > > > > I meant the client.
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > > Sadly looking into the timestamp will not help much
    > as we
    >     > use
    >     > > > > some
    >     > > > > > > > > business
    >     > > > > > > > > > time field to set the record timestamp. If I am
    > right,
    >     > there
    >     > > is
    >     > > > > no
    >     > > > > > > way
    >     > > > > > > > > now
    >     > > > > > > > > > to know that when a Producer wrote a record in a
    > Kafka
    >     > topic.
    >     > > > > > > > > >
    >     > > > > > > > > > Regards,
    >     > > > > > > > > > Mangat
    >     > > > > > > > > >
    >     > > > > > > > > >
    >     > > > > > > > > >
    >     > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
    >     > > > wangguoz@gmail.com
    >     > > > > >
    >     > > > > > > > wrote:
    >     > > > > > > > > >
    >     > > > > > > > > > > Hello Mangat,
    >     > > > > > > > > > >
    >     > > > > > > > > > > With at least once, although some records maybe
    > processed
    >     > > > > > multiple
    >     > > > > > > > > times
    >     > > > > > > > > > > their process ordering should not be violated, so
    > what
    >     > you
    >     > > > > > observed
    >     > > > > > > > is
    >     > > > > > > > > > not
    >     > > > > > > > > > > expected. What caught my eyes are this section in
    > your
    >     > > output
    >     > > > > > > > > changelogs
    >     > > > > > > > > > > (high-lighted):
    >     > > > > > > > > > >
    >     > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > Key1, null
    >     > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > Key1, null  (processed again)
    >     > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > Key1, null
    >     > > > > > > > > > >
    >     > > > > > > > > > > *Key1, V1Key1,V2*
    >     > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
    > tombstone yet
    >     > but
    >     > > > > > > > reprocessed
    >     > > > > > > > > > V1
    >     > > > > > > > > > > again due to reassignment)
    >     > > > > > > > > > >
    >     > > > > > > > > > > They seem to be the result of first receiving a
    > tombstone
    >     > > > which
    >     > > > > > > > removes
    >     > > > > > > > > > V1
    >     > > > > > > > > > > and then a new record that adds V2. However, since
    >     > caching
    >     > > is
    >     > > > > > > > disabled
    >     > > > > > > > > > you
    >     > > > > > > > > > > should get
    >     > > > > > > > > > >
    >     > > > > > > > > > > *Key1,V1*
    >     > > > > > > > > > > *Key1,null*
    >     > > > > > > > > > > *Key1,V2*
    >     > > > > > > > > > >
    >     > > > > > > > > > > instead; without the actual code snippet I cannot
    > tell
    >     > more
    >     > > > > > what's
    >     > > > > > > > > > > happening here. If you can look into the logs you
    > can
    >     > > record
    >     > > > > each
    >     > > > > > > > time
    >     > > > > > > > > > when
    >     > > > > > > > > > > partition migrates, how many records from the
    > changelog
    >     > was
    >     > > > > > > replayed
    >     > > > > > > > to
    >     > > > > > > > > > > restore the store, and from which offset on the
    > input
    >     > topic
    >     > > > > does
    >     > > > > > > > > Streams
    >     > > > > > > > > > > resume processing. You may also consider upgrading
    > to
    >     > 2.6.x
    >     > > > or
    >     > > > > > > higher
    >     > > > > > > > > > > version and see if this issue goes away.
    >     > > > > > > > > > >
    >     > > > > > > > > > >
    >     > > > > > > > > > > Guozhang
    >     > > > > > > > > > >
    >     > > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
    >     > > > > mangatmodi@gmail.com>
    >     > > > > > > > > wrote:
    >     > > > > > > > > > >
    >     > > > > > > > > > > > Hey,
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > We have the following setup in our
    > infrastructure.
    >     > > > > > > > > > > >
    >     > > > > > > > > > > >    1. Kafka - 2.5.1
    >     > > > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka`
    > version
    >     > > > 2.5.1
    >     > > > > > > > library
    >     > > > > > > > > > > >    3. Low level processor API is used with
    >     > *atleast-once*
    >     > > > > > > semantics
    >     > > > > > > > > > > >    4. State stores are *in-memory* with *caching
    >     > > disabled*
    >     > > > > and
    >     > > > > > > > > > *changelog
    >     > > > > > > > > > > >    enabled*
    >     > > > > > > > > > > >
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > Is it possible that during state replication and
    >     > > partition
    >     > > > > > > > > > reassignment,
    >     > > > > > > > > > > > the input data is not always applied to the state
    >     > store?
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > 1. Let's say the input topic is having records
    > like
    >     > > > following
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null (tombstone)
    >     > > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V3
    >     > > > > > > > > > > > Key1, V4
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > 2. The app has an aggregation function which
    > takes
    >     > these
    >     > > > > record
    >     > > > > > > and
    >     > > > > > > > > > > update
    >     > > > > > > > > > > > the state store so that changelog shall be
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null (tombstone)
    >     > > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V3
    >     > > > > > > > > > > > Key1, V3 + V4
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Let's say the partition responsible for
    > processing the
    >     > > > above
    >     > > > > > key
    >     > > > > > > > was
    >     > > > > > > > > > > > several times reallocated to different threads
    > due to
    >     > > some
    >     > > > > > infra
    >     > > > > > > > > issues
    >     > > > > > > > > > > we
    >     > > > > > > > > > > > are having(in Kubernetes where we run the app,
    > not the
    >     > > > Kafka
    >     > > > > > > > > cluster).
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > I see the following record in the changelogs
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1, null  (processed again)
    >     > > > > > > > > > > > Key1, V2
    >     > > > > > > > > > > > Key1, null
    >     > > > > > > > > > > > Key1, V1
    >     > > > > > > > > > > > Key1,V2
    >     > > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
    > tombstone yet
    >     > > but
    >     > > > > > > > > reprocessed
    >     > > > > > > > > > > V1
    >     > > > > > > > > > > > again due to reassignment)
    >     > > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone,
    > but then
    >     > V1
    >     > > > > > > tombstone
    >     > > > > > > > > > > should
    >     > > > > > > > > > > > have been applied also!!)
    >     > > > > > > > > > > > Key1, V2+V1 (it is back!!!)
    >     > > > > > > > > > > > Key1,V1
    >     > > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
    >     > > > > > > > > > > > ```
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > If you see this means several things
    >     > > > > > > > > > > > 1. The state is always correctly applied locally
    > (in
    >     > > > > developer
    >     > > > > > > > > laptop),
    >     > > > > > > > > > > > where there were no reassignments.
    >     > > > > > > > > > > > 2. The records are processed multiple times,
    > which is
    >     > > > > > > > understandable
    >     > > > > > > > > as
    >     > > > > > > > > > > we
    >     > > > > > > > > > > > have at least symantics here.
    >     > > > > > > > > > > > 3. As long as we re-apply the same events in the
    > same
    >     > > > orders
    >     > > > > we
    >     > > > > > > are
    >     > > > > > > > > > > golden
    >     > > > > > > > > > > > but looks like some records are skipped, but
    > here it
    >     > > looks
    >     > > > as
    >     > > > > > if
    >     > > > > > > we
    >     > > > > > > > > > have
    >     > > > > > > > > > > > multiple consumers reading and update the same
    > topics,
    >     > > > > leading
    >     > > > > > to
    >     > > > > > > > > race
    >     > > > > > > > > > > > conditions.
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > Is there any way, Kafka streams' state
    > replication
    >     > could
    >     > > > lead
    >     > > > > > to
    >     > > > > > > > > such a
    >     > > > > > > > > > > > race condition?
    >     > > > > > > > > > > >
    >     > > > > > > > > > > > Regards,
    >     > > > > > > > > > > > Mangat
    >     > > > > > > > > > > >
    >     > > > > > > > > > >
    >     > > > > > > > > > >
    >     > > > > > > > > > > --
    >     > > > > > > > > > > -- Guozhang
    >     > > > > > > > > > >
    >     > > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > >
    >     > > > > > > > > --
    >     > > > > > > > > -- Guozhang
    >     > > > > > > > >
    >     > > > > > > >
    >     > > > > > >
    >     > > > > > >
    >     > > > > > > --
    >     > > > > > > -- Guozhang
    >     > > > > > >
    >     > > > > >
    >     > > > >
    >     > > > >
    >     > > > > --
    >     > > > > -- Guozhang
    >     > > > >
    >     > > >
    >     > >
    >     > >
    >     > > --
    >     > > -- Guozhang
    >     > >
    >     >
    >
    >
    >     --
    >     -- Guozhang
    >
    >
    >
    
    -- 
    -- Guozhang
    


Re: Kafka Stream: State replication seems unpredictable.

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Mohan,

Sorry for getting late on the thread. Just to revive your concerns here: if
in your topology there's no output at all to any topics (sink topics,
changelog topics), then yes the zombie would not be detected; but on the
other hand the topology itself is not make any visible changes to the
external systems anyways -- you can just think of a zombie who's keeping
doing redundant work and then drop the results on the floor since nothing
was reflected outside.

On the other hand, if the tasks are at least writing to some sink topics,
then zombies would still be detected.


Guozhang

On Thu, Apr 22, 2021 at 10:47 AM Parthasarathy, Mohan <mp...@hpe.com>
wrote:

> Guozhang,
>
> What does this mean if the changelog topic was disabled ? If thread 2 and
> thread 4 are running in two different nodes and a rebalance occurs, thread
> 2 will not realize it is a zombie without the write to the changelog topic,
> right ? I am trying to understand the cases under which the changelog topic
> can ever be disabled.
>
> Thanks
> Mohan
>
>
> On 4/21/21, 10:22 PM, "Guozhang Wang" <wa...@gmail.com> wrote:
>
>     Hello Mangat,
>
>     I think using persistent store that relies on in-memory stores could
> help
>     if the threads are from the same instance.
>
>     Guozhang
>
>     On Tue, Apr 20, 2021 at 12:54 AM mangat rai <ma...@gmail.com>
> wrote:
>
>     > Hey Guozhang,
>     >
>     > Thanks for creating the issue. Yes, you are right, this will happen
> only
>     > with the consecutive rebalancing as after some time zombie thread
> will stop
>     > and re-join the group and the new thread will always overwrite the
> state
>     > with the latest data. In our poor infra setup, the rebalancing was
>     > happening many times in a row.
>     >
>     > Now, we can't guarantee that the consecutive rebalancing will not
> happen
>     > again (we reduced fetch-size which fixed it in many ways), will any
> of the
>     > following work as a workaround?
>     >
>     > 1. Use persistent store instead of in-memory. The new thread will
> never get
>     > the lock hence we will lose availability but keep things consistent.
>     > 2. Use exactly-once semantics. However, we might need to redesign
> our apps.
>     > It's a bigger change.
>     >
>     > Regards,
>     > Mangat
>     >
>     > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wa...@gmail.com>
> wrote:
>     >
>     > > Hello Mangat,
>     > >
>     > > What you've encountered is a "zombie writer" issue, that is,
> Thread-2 did
>     > > not know there's already a new rebalance and hence its partitions
> have
>     > been
>     > > migrated out, until it tries to commit and then got notified of the
>     > > illegal-generation error and realize itself is the "zombie"
> already. This
>     > > case would still persist even with incremental rebalancing.
>     > >
>     > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693  to
>     > summarize
>     > > the situation. Please LMK if that explanation is clear to you.
>     > >
>     > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com>
>     > wrote:
>     > >
>     > > > Thanks, Guozhang,
>     > > >
>     > > > I was knocking myself with Kafka's various consumer rebalancing
>     > > algorithms
>     > > > in the last 2 days. Could I generalize this problem as
>     > > >
>     > > >
>     > > >
>     > > > *Any in-memory state store backed by a changelog topic will
> always risk
>     > > > having interleaved writes from two different writers during
>     > rebalancing?*
>     > > > In our case, CPU throttling made it worse as thread-2 didn't try
> to
>     > > commit
>     > > > for a long time. Also,
>     > > >
>     > > > 1. Do you think if we disable the incremental rebalancing, we
> will not
>     > > have
>     > > > this issue because If I understood correctly Thread-4 will not
> start
>     > > > processing until the state is completely transferred from
> Thread-2.
>     > > > 2. If yes, how can we disable it without downgrading the client?
>     > > >
>     > > > Since we have a very low scale and no real-time computing
> requirement,
>     > we
>     > > > will be happy to sacrifice the availability to have consistency.
>     > > >
>     > > > Regards,
>     > > > Mangat
>     > > >
>     > > >
>     > > >
>     > > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <
> wangguoz@gmail.com>
>     > > wrote:
>     > > >
>     > > > > Hi Mangat:
>     > > > >
>     > > > > I think I found the issue of your problem here.
>     > > > >
>     > > > > It seems thread-2's partition was assigned to thread-4 while
> thread-2
>     > > was
>     > > > > not aware (because it missed a rebalance, this is normal
> scenario);
>     > in
>     > > > > other words, thread2 becomes a "zombie". It would stay in that
> zombie
>     > > > state
>     > > > > until it tried to commit, in which it would get an error from
> the
>     > > brokers
>     > > > > and realize its zombie identity and re-joins the group.
>     > > > >
>     > > > > During that period of time, before the commit was issued, it
> would
>     > > > continue
>     > > > > trying to write to its local states; here are several
> scenarios:
>     > > > >
>     > > > > 1) if thread-2/4 are belonging to two different nodes then
> that is
>     > > fine,
>     > > > > since they will write to different local state stores.
>     > > > > 2) if they belong to the same nodes, and
>     > > > >    a) the state stores are persistent then they would have
> risks of
>     > > > > contention; this is guarded by the state directory locks (as
> file
>     > > locks)
>     > > > in
>     > > > > which case the new owner thread-4 should not be able to get on
> the
>     > > local
>     > > > > state files.
>     > > > >    b) the state stores are in-memory, in which case that is
> fine
>     > since
>     > > > the
>     > > > > in-memory stores are kept separate as well.
>     > > > >
>     > > > > In your case: 2.b), the issue is that the changelog would
> still be
>     > > shared
>     > > > > between the two --- but note that this is the same case as in
> case 1)
>     > > as
>     > > > > well. And this means at that time the changelog is shared by
> two
>     > > writers
>     > > > > sending records interleaving. And if there’s a tombstone that
> was
>     > > > intended
>     > > > > for a record A, but when it was written interleaving and
> there’s
>     > > another
>     > > > > record B in between, that tombstone would effectively delete
> record
>     > B.
>     > > > The
>     > > > > key here is that, when we replay the changelogs, we replay it
>     > > completely
>     > > > > following offset ordering.
>     > > > >
>     > > > >
>     > > > >
>     > > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <
> mangatmodi@gmail.com>
>     > > wrote:
>     > > > >
>     > > > > > Guozhang,
>     > > > > >
>     > > > > > Yes, you are correct. We have our own group processor. I
> have more
>     > > > > > information now.
>     > > > > >
>     > > > > > 1. I added ThreadId in the data when the app persists into
> the
>     > > > changelog
>     > > > > > topic.
>     > > > > > 2. Thread-2 which was working with partition-0 had a timeout
> issue.
>     > > > > > 4. Thread-4 picked up this partition-0 as I can see its Id
> in the
>     > > > > > changelog.
>     > > > > > 5. *But then Thread-2 and Thread-4 both were writing into the
>     > > > partition-0
>     > > > > > of the changelog, that too for the same key.*
>     > > > > >
>     > > > > > So I was clearly able to see that two threads were
> overwriting data
>     > > of
>     > > > > one
>     > > > > > another into the state store leading to a corrupted state.
> This
>     > > > confirms
>     > > > > my
>     > > > > > theory that it was an issue of concurrent update. This was
>     > something
>     > > > > > totally unexpected. I suspect that Thread-2 continues to
> persist
>     > its
>     > > > > > in-memory state, maybe because It wasn't stopped after the
> timeout
>     > > > > > exception. Is there a configuration possible in the Kafka
> stream
>     > > which
>     > > > > > could lead to this?
>     > > > > >
>     > > > > > There was no network issue, our CPU was highly throttled by
>     > > Kubernetes.
>     > > > > We
>     > > > > > gave more resources, also decreased the fetch-size so we
> have more
>     > > I/O
>     > > > to
>     > > > > > Cpu time ratio than before, and then there was no timeout
> issue,
>     > > hence
>     > > > no
>     > > > > > reassignment and hence no corrupted state.
>     > > > > >
>     > > > > > I really appreciate your help here...
>     > > > > > Thanks!
>     > > > > > Mangat
>     > > > > >
>     > > > > >
>     > > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <
> wangguoz@gmail.com>
>     > > > > wrote:
>     > > > > >
>     > > > > > > Hey Mangat,
>     > > > > > >
>     > > > > > > A. With at least once, Streams does not make sure
> atomicity of
>     > 1) /
>     > > > 2);
>     > > > > > > with exactly once, atomicity is indeed guaranteed with
>     > > transactional
>     > > > > > > messaging.
>     > > > > > >
>     > > > > > > B. If you are using processor API, then I'm assuming you
> did your
>     > > own
>     > > > > > > group-by processor right? In that case, the partition key
> would
>     > > just
>     > > > be
>     > > > > > the
>     > > > > > > record key when you are sending to the repartition topic.
>     > > > > > >
>     > > > > > >
>     > > > > > > Guozhang
>     > > > > > >
>     > > > > > >
>     > > > > > >
>     > > > > > >
>     > > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <
> mangatmodi@gmail.com>
>     > > > > wrote:
>     > > > > > >
>     > > > > > > > Thanks again, that makes things clear. I still have some
>     > > questions
>     > > > > here
>     > > > > > > > then.
>     > > > > > > >
>     > > > > > > > A.  For each record we read, we do two updates
>     > > > > > > >       1. Changelog topic of the state store.
>     > > > > > > >       2. Output topic aka sink.
>     > > > > > > >       Does the Kafka stream app make sure that either
> both are
>     > > > > > committed
>     > > > > > > or
>     > > > > > > > neither?
>     > > > > > > >
>     > > > > > > > B.  Out Input topic actually has the as (a,b,c), but we
>     > partition
>     > > > > with
>     > > > > > > only
>     > > > > > > > (a). We do this because we have different compaction
>     > requirements
>     > > > > than
>     > > > > > > the
>     > > > > > > > partitions. It will still work as all (a,b,c) records
> will go
>     > to
>     > > > the
>     > > > > > same
>     > > > > > > > partition. Now in aggregation, we group by (a,b,c). In
> such
>     > case
>     > > > what
>     > > > > > > will
>     > > > > > > > be the partition key for the changelog topic?
>     > > > > > > >
>     > > > > > > > Note that we use low-level processor API and don't commit
>     > > > ourselves.
>     > > > > > > >
>     > > > > > > > Regards,
>     > > > > > > > Mangat
>     > > > > > > >
>     > > > > > > >
>     > > > > > > >
>     > > > > > > >
>     > > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <
>     > wangguoz@gmail.com
>     > > >
>     > > > > > wrote:
>     > > > > > > >
>     > > > > > > > > Hi Mangat,
>     > > > > > > > >
>     > > > > > > > > Please see my replies inline below.
>     > > > > > > > >
>     > > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
>     > > mangatmodi@gmail.com>
>     > > > > > > wrote:
>     > > > > > > > >
>     > > > > > > > > > @Guozhang Wang
>     > > > > > > > > >
>     > > > > > > > > > Thanks for the reply.  Indeed I am finding it
> difficult to
>     > > > > explain
>     > > > > > > this
>     > > > > > > > > > state. I checked the code many times. There can be a
> bug
>     > but
>     > > I
>     > > > > fail
>     > > > > > > to
>     > > > > > > > > see
>     > > > > > > > > > it. There are several things about the Kafka streams
> that I
>     > > > don't
>     > > > > > > > > > understand, which makes it harder for me to reason.
>     > > > > > > > > >
>     > > > > > > > > > 1. What is the partition key for the changelog
> topics? Is
>     > it
>     > > > the
>     > > > > > same
>     > > > > > > > as
>     > > > > > > > > > the Input key or the state store key? Or maybe the
> thread
>     > > > > specifies
>     > > > > > > the
>     > > > > > > > > > partition as it knows the input partition it is
> subscribed
>     > > to?
>     > > > If
>     > > > > > the
>     > > > > > > > > input
>     > > > > > > > > > topic and state store are differently partitioned
> then we
>     > can
>     > > > > > explain
>     > > > > > > > the
>     > > > > > > > > > issue here.
>     > > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > In Kafka Stream's changelog, the "partition key" of
> Kafka
>     > > > messages
>     > > > > is
>     > > > > > > the
>     > > > > > > > > same as the "message key" itself. And the message key
> is the
>     > > same
>     > > > > as
>     > > > > > > the
>     > > > > > > > > state store key.
>     > > > > > > > >
>     > > > > > > > > Since the state store here should be storing the
> running
>     > > > aggregate,
>     > > > > > it
>     > > > > > > > > means that the partition key is the same as the
> aggregated
>     > key.
>     > > > > > > > >
>     > > > > > > > > If you are doing a group-by aggregation here, where the
>     > > group-by
>     > > > > keys
>     > > > > > > are
>     > > > > > > > > different from the source input topic's keys, hence
> the state
>     > > > store
>     > > > > > > keys
>     > > > > > > > > would be different with the input topic keys.
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > > 2. Is there a background thread to persist in the
> state
>     > store
>     > > > > when
>     > > > > > > > > caching
>     > > > > > > > > > is disabled? When will the app commit the log for
> the input
>     > > > > topic?
>     > > > > > Is
>     > > > > > > > it
>     > > > > > > > > > when sink writes into the output topic or when the
> state
>     > > store
>     > > > > > writes
>     > > > > > > > > into
>     > > > > > > > > > the changelog topic? Because, if the app commits the
> record
>     > > > > before
>     > > > > > > the
>     > > > > > > > > data
>     > > > > > > > > > was written to changelog topic then we can again
> explain
>     > this
>     > > > > state
>     > > > > > > > > >
>     > > > > > > > > > The commit happens *after* the local state store, as
> well
>     > as
>     > > > the
>     > > > > > > > > changelog records sent by the Streams' producers, have
> been
>     > > > > flushed.
>     > > > > > > I.e.
>     > > > > > > > > if there's a failure in between, you would re-process
> some
>     > > source
>     > > > > > > records
>     > > > > > > > > and hence cause duplicates, but no data loss (a.k.a.
> the
>     > > > > > at_least_once
>     > > > > > > > > semantics).
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > > >You may also consider upgrading to 2.6.x or higher
> version
>     > > and
>     > > > > see
>     > > > > > > if
>     > > > > > > > > this
>     > > > > > > > > > issue goes away.
>     > > > > > > > > > Do you mean the client or the Kafka broker? I will be
>     > > upgrading
>     > > > > the
>     > > > > > > > > client
>     > > > > > > > > > to 2.7.0 soon.
>     > > > > > > > > >
>     > > > > > > > > > I meant the client.
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > > Sadly looking into the timestamp will not help much
> as we
>     > use
>     > > > > some
>     > > > > > > > > business
>     > > > > > > > > > time field to set the record timestamp. If I am
> right,
>     > there
>     > > is
>     > > > > no
>     > > > > > > way
>     > > > > > > > > now
>     > > > > > > > > > to know that when a Producer wrote a record in a
> Kafka
>     > topic.
>     > > > > > > > > >
>     > > > > > > > > > Regards,
>     > > > > > > > > > Mangat
>     > > > > > > > > >
>     > > > > > > > > >
>     > > > > > > > > >
>     > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
>     > > > wangguoz@gmail.com
>     > > > > >
>     > > > > > > > wrote:
>     > > > > > > > > >
>     > > > > > > > > > > Hello Mangat,
>     > > > > > > > > > >
>     > > > > > > > > > > With at least once, although some records maybe
> processed
>     > > > > > multiple
>     > > > > > > > > times
>     > > > > > > > > > > their process ordering should not be violated, so
> what
>     > you
>     > > > > > observed
>     > > > > > > > is
>     > > > > > > > > > not
>     > > > > > > > > > > expected. What caught my eyes are this section in
> your
>     > > output
>     > > > > > > > > changelogs
>     > > > > > > > > > > (high-lighted):
>     > > > > > > > > > >
>     > > > > > > > > > > Key1, V1
>     > > > > > > > > > > Key1, null
>     > > > > > > > > > > Key1, V1
>     > > > > > > > > > > Key1, null  (processed again)
>     > > > > > > > > > > Key1, V2
>     > > > > > > > > > > Key1, null
>     > > > > > > > > > >
>     > > > > > > > > > > *Key1, V1Key1,V2*
>     > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
> tombstone yet
>     > but
>     > > > > > > > reprocessed
>     > > > > > > > > > V1
>     > > > > > > > > > > again due to reassignment)
>     > > > > > > > > > >
>     > > > > > > > > > > They seem to be the result of first receiving a
> tombstone
>     > > > which
>     > > > > > > > removes
>     > > > > > > > > > V1
>     > > > > > > > > > > and then a new record that adds V2. However, since
>     > caching
>     > > is
>     > > > > > > > disabled
>     > > > > > > > > > you
>     > > > > > > > > > > should get
>     > > > > > > > > > >
>     > > > > > > > > > > *Key1,V1*
>     > > > > > > > > > > *Key1,null*
>     > > > > > > > > > > *Key1,V2*
>     > > > > > > > > > >
>     > > > > > > > > > > instead; without the actual code snippet I cannot
> tell
>     > more
>     > > > > > what's
>     > > > > > > > > > > happening here. If you can look into the logs you
> can
>     > > record
>     > > > > each
>     > > > > > > > time
>     > > > > > > > > > when
>     > > > > > > > > > > partition migrates, how many records from the
> changelog
>     > was
>     > > > > > > replayed
>     > > > > > > > to
>     > > > > > > > > > > restore the store, and from which offset on the
> input
>     > topic
>     > > > > does
>     > > > > > > > > Streams
>     > > > > > > > > > > resume processing. You may also consider upgrading
> to
>     > 2.6.x
>     > > > or
>     > > > > > > higher
>     > > > > > > > > > > version and see if this issue goes away.
>     > > > > > > > > > >
>     > > > > > > > > > >
>     > > > > > > > > > > Guozhang
>     > > > > > > > > > >
>     > > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
>     > > > > mangatmodi@gmail.com>
>     > > > > > > > > wrote:
>     > > > > > > > > > >
>     > > > > > > > > > > > Hey,
>     > > > > > > > > > > >
>     > > > > > > > > > > > We have the following setup in our
> infrastructure.
>     > > > > > > > > > > >
>     > > > > > > > > > > >    1. Kafka - 2.5.1
>     > > > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka`
> version
>     > > > 2.5.1
>     > > > > > > > library
>     > > > > > > > > > > >    3. Low level processor API is used with
>     > *atleast-once*
>     > > > > > > semantics
>     > > > > > > > > > > >    4. State stores are *in-memory* with *caching
>     > > disabled*
>     > > > > and
>     > > > > > > > > > *changelog
>     > > > > > > > > > > >    enabled*
>     > > > > > > > > > > >
>     > > > > > > > > > > >
>     > > > > > > > > > > > Is it possible that during state replication and
>     > > partition
>     > > > > > > > > > reassignment,
>     > > > > > > > > > > > the input data is not always applied to the state
>     > store?
>     > > > > > > > > > > >
>     > > > > > > > > > > > 1. Let's say the input topic is having records
> like
>     > > > following
>     > > > > > > > > > > >
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null (tombstone)
>     > > > > > > > > > > > Key1, V2
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V3
>     > > > > > > > > > > > Key1, V4
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > 2. The app has an aggregation function which
> takes
>     > these
>     > > > > record
>     > > > > > > and
>     > > > > > > > > > > update
>     > > > > > > > > > > > the state store so that changelog shall be
>     > > > > > > > > > > >
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null (tombstone)
>     > > > > > > > > > > > Key1, V2
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V3
>     > > > > > > > > > > > Key1, V3 + V4
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Let's say the partition responsible for
> processing the
>     > > > above
>     > > > > > key
>     > > > > > > > was
>     > > > > > > > > > > > several times reallocated to different threads
> due to
>     > > some
>     > > > > > infra
>     > > > > > > > > issues
>     > > > > > > > > > > we
>     > > > > > > > > > > > are having(in Kubernetes where we run the app,
> not the
>     > > > Kafka
>     > > > > > > > > cluster).
>     > > > > > > > > > > >
>     > > > > > > > > > > > I see the following record in the changelogs
>     > > > > > > > > > > >
>     > > > > > > > > > > > ```
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1, null  (processed again)
>     > > > > > > > > > > > Key1, V2
>     > > > > > > > > > > > Key1, null
>     > > > > > > > > > > > Key1, V1
>     > > > > > > > > > > > Key1,V2
>     > > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2
> tombstone yet
>     > > but
>     > > > > > > > > reprocessed
>     > > > > > > > > > > V1
>     > > > > > > > > > > > again due to reassignment)
>     > > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone,
> but then
>     > V1
>     > > > > > > tombstone
>     > > > > > > > > > > should
>     > > > > > > > > > > > have been applied also!!)
>     > > > > > > > > > > > Key1, V2+V1 (it is back!!!)
>     > > > > > > > > > > > Key1,V1
>     > > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
>     > > > > > > > > > > > ```
>     > > > > > > > > > > >
>     > > > > > > > > > > > If you see this means several things
>     > > > > > > > > > > > 1. The state is always correctly applied locally
> (in
>     > > > > developer
>     > > > > > > > > laptop),
>     > > > > > > > > > > > where there were no reassignments.
>     > > > > > > > > > > > 2. The records are processed multiple times,
> which is
>     > > > > > > > understandable
>     > > > > > > > > as
>     > > > > > > > > > > we
>     > > > > > > > > > > > have at least symantics here.
>     > > > > > > > > > > > 3. As long as we re-apply the same events in the
> same
>     > > > orders
>     > > > > we
>     > > > > > > are
>     > > > > > > > > > > golden
>     > > > > > > > > > > > but looks like some records are skipped, but
> here it
>     > > looks
>     > > > as
>     > > > > > if
>     > > > > > > we
>     > > > > > > > > > have
>     > > > > > > > > > > > multiple consumers reading and update the same
> topics,
>     > > > > leading
>     > > > > > to
>     > > > > > > > > race
>     > > > > > > > > > > > conditions.
>     > > > > > > > > > > >
>     > > > > > > > > > > > Is there any way, Kafka streams' state
> replication
>     > could
>     > > > lead
>     > > > > > to
>     > > > > > > > > such a
>     > > > > > > > > > > > race condition?
>     > > > > > > > > > > >
>     > > > > > > > > > > > Regards,
>     > > > > > > > > > > > Mangat
>     > > > > > > > > > > >
>     > > > > > > > > > >
>     > > > > > > > > > >
>     > > > > > > > > > > --
>     > > > > > > > > > > -- Guozhang
>     > > > > > > > > > >
>     > > > > > > > > >
>     > > > > > > > >
>     > > > > > > > >
>     > > > > > > > > --
>     > > > > > > > > -- Guozhang
>     > > > > > > > >
>     > > > > > > >
>     > > > > > >
>     > > > > > >
>     > > > > > > --
>     > > > > > > -- Guozhang
>     > > > > > >
>     > > > > >
>     > > > >
>     > > > >
>     > > > > --
>     > > > > -- Guozhang
>     > > > >
>     > > >
>     > >
>     > >
>     > > --
>     > > -- Guozhang
>     > >
>     >
>
>
>     --
>     -- Guozhang
>
>
>

-- 
-- Guozhang

Re: Kafka Stream: State replication seems unpredictable.

Posted by "Parthasarathy, Mohan" <mp...@hpe.com>.
Guozhang,

What does this mean if the changelog topic was disabled ? If thread 2 and thread 4 are running in two different nodes and a rebalance occurs, thread 2 will not realize it is a zombie without the write to the changelog topic, right ? I am trying to understand the cases under which the changelog topic can ever be disabled.

Thanks
Mohan


On 4/21/21, 10:22 PM, "Guozhang Wang" <wa...@gmail.com> wrote:

    Hello Mangat,
    
    I think using persistent store that relies on in-memory stores could help
    if the threads are from the same instance.
    
    Guozhang
    
    On Tue, Apr 20, 2021 at 12:54 AM mangat rai <ma...@gmail.com> wrote:
    
    > Hey Guozhang,
    >
    > Thanks for creating the issue. Yes, you are right, this will happen only
    > with the consecutive rebalancing as after some time zombie thread will stop
    > and re-join the group and the new thread will always overwrite the state
    > with the latest data. In our poor infra setup, the rebalancing was
    > happening many times in a row.
    >
    > Now, we can't guarantee that the consecutive rebalancing will not happen
    > again (we reduced fetch-size which fixed it in many ways), will any of the
    > following work as a workaround?
    >
    > 1. Use persistent store instead of in-memory. The new thread will never get
    > the lock hence we will lose availability but keep things consistent.
    > 2. Use exactly-once semantics. However, we might need to redesign our apps.
    > It's a bigger change.
    >
    > Regards,
    > Mangat
    >
    > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wa...@gmail.com> wrote:
    >
    > > Hello Mangat,
    > >
    > > What you've encountered is a "zombie writer" issue, that is, Thread-2 did
    > > not know there's already a new rebalance and hence its partitions have
    > been
    > > migrated out, until it tries to commit and then got notified of the
    > > illegal-generation error and realize itself is the "zombie" already. This
    > > case would still persist even with incremental rebalancing.
    > >
    > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693  to
    > summarize
    > > the situation. Please LMK if that explanation is clear to you.
    > >
    > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com>
    > wrote:
    > >
    > > > Thanks, Guozhang,
    > > >
    > > > I was knocking myself with Kafka's various consumer rebalancing
    > > algorithms
    > > > in the last 2 days. Could I generalize this problem as
    > > >
    > > >
    > > >
    > > > *Any in-memory state store backed by a changelog topic will always risk
    > > > having interleaved writes from two different writers during
    > rebalancing?*
    > > > In our case, CPU throttling made it worse as thread-2 didn't try to
    > > commit
    > > > for a long time. Also,
    > > >
    > > > 1. Do you think if we disable the incremental rebalancing, we will not
    > > have
    > > > this issue because If I understood correctly Thread-4 will not start
    > > > processing until the state is completely transferred from Thread-2.
    > > > 2. If yes, how can we disable it without downgrading the client?
    > > >
    > > > Since we have a very low scale and no real-time computing requirement,
    > we
    > > > will be happy to sacrifice the availability to have consistency.
    > > >
    > > > Regards,
    > > > Mangat
    > > >
    > > >
    > > >
    > > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wa...@gmail.com>
    > > wrote:
    > > >
    > > > > Hi Mangat:
    > > > >
    > > > > I think I found the issue of your problem here.
    > > > >
    > > > > It seems thread-2's partition was assigned to thread-4 while thread-2
    > > was
    > > > > not aware (because it missed a rebalance, this is normal scenario);
    > in
    > > > > other words, thread2 becomes a "zombie". It would stay in that zombie
    > > > state
    > > > > until it tried to commit, in which it would get an error from the
    > > brokers
    > > > > and realize its zombie identity and re-joins the group.
    > > > >
    > > > > During that period of time, before the commit was issued, it would
    > > > continue
    > > > > trying to write to its local states; here are several scenarios:
    > > > >
    > > > > 1) if thread-2/4 are belonging to two different nodes then that is
    > > fine,
    > > > > since they will write to different local state stores.
    > > > > 2) if they belong to the same nodes, and
    > > > >    a) the state stores are persistent then they would have risks of
    > > > > contention; this is guarded by the state directory locks (as file
    > > locks)
    > > > in
    > > > > which case the new owner thread-4 should not be able to get on the
    > > local
    > > > > state files.
    > > > >    b) the state stores are in-memory, in which case that is fine
    > since
    > > > the
    > > > > in-memory stores are kept separate as well.
    > > > >
    > > > > In your case: 2.b), the issue is that the changelog would still be
    > > shared
    > > > > between the two --- but note that this is the same case as in case 1)
    > > as
    > > > > well. And this means at that time the changelog is shared by two
    > > writers
    > > > > sending records interleaving. And if there’s a tombstone that was
    > > > intended
    > > > > for a record A, but when it was written interleaving and there’s
    > > another
    > > > > record B in between, that tombstone would effectively delete record
    > B.
    > > > The
    > > > > key here is that, when we replay the changelogs, we replay it
    > > completely
    > > > > following offset ordering.
    > > > >
    > > > >
    > > > >
    > > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <ma...@gmail.com>
    > > wrote:
    > > > >
    > > > > > Guozhang,
    > > > > >
    > > > > > Yes, you are correct. We have our own group processor. I have more
    > > > > > information now.
    > > > > >
    > > > > > 1. I added ThreadId in the data when the app persists into the
    > > > changelog
    > > > > > topic.
    > > > > > 2. Thread-2 which was working with partition-0 had a timeout issue.
    > > > > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
    > > > > > changelog.
    > > > > > 5. *But then Thread-2 and Thread-4 both were writing into the
    > > > partition-0
    > > > > > of the changelog, that too for the same key.*
    > > > > >
    > > > > > So I was clearly able to see that two threads were overwriting data
    > > of
    > > > > one
    > > > > > another into the state store leading to a corrupted state. This
    > > > confirms
    > > > > my
    > > > > > theory that it was an issue of concurrent update. This was
    > something
    > > > > > totally unexpected. I suspect that Thread-2 continues to persist
    > its
    > > > > > in-memory state, maybe because It wasn't stopped after the timeout
    > > > > > exception. Is there a configuration possible in the Kafka stream
    > > which
    > > > > > could lead to this?
    > > > > >
    > > > > > There was no network issue, our CPU was highly throttled by
    > > Kubernetes.
    > > > > We
    > > > > > gave more resources, also decreased the fetch-size so we have more
    > > I/O
    > > > to
    > > > > > Cpu time ratio than before, and then there was no timeout issue,
    > > hence
    > > > no
    > > > > > reassignment and hence no corrupted state.
    > > > > >
    > > > > > I really appreciate your help here...
    > > > > > Thanks!
    > > > > > Mangat
    > > > > >
    > > > > >
    > > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wa...@gmail.com>
    > > > > wrote:
    > > > > >
    > > > > > > Hey Mangat,
    > > > > > >
    > > > > > > A. With at least once, Streams does not make sure atomicity of
    > 1) /
    > > > 2);
    > > > > > > with exactly once, atomicity is indeed guaranteed with
    > > transactional
    > > > > > > messaging.
    > > > > > >
    > > > > > > B. If you are using processor API, then I'm assuming you did your
    > > own
    > > > > > > group-by processor right? In that case, the partition key would
    > > just
    > > > be
    > > > > > the
    > > > > > > record key when you are sending to the repartition topic.
    > > > > > >
    > > > > > >
    > > > > > > Guozhang
    > > > > > >
    > > > > > >
    > > > > > >
    > > > > > >
    > > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com>
    > > > > wrote:
    > > > > > >
    > > > > > > > Thanks again, that makes things clear. I still have some
    > > questions
    > > > > here
    > > > > > > > then.
    > > > > > > >
    > > > > > > > A.  For each record we read, we do two updates
    > > > > > > >       1. Changelog topic of the state store.
    > > > > > > >       2. Output topic aka sink.
    > > > > > > >       Does the Kafka stream app make sure that either both are
    > > > > > committed
    > > > > > > or
    > > > > > > > neither?
    > > > > > > >
    > > > > > > > B.  Out Input topic actually has the as (a,b,c), but we
    > partition
    > > > > with
    > > > > > > only
    > > > > > > > (a). We do this because we have different compaction
    > requirements
    > > > > than
    > > > > > > the
    > > > > > > > partitions. It will still work as all (a,b,c) records will go
    > to
    > > > the
    > > > > > same
    > > > > > > > partition. Now in aggregation, we group by (a,b,c). In such
    > case
    > > > what
    > > > > > > will
    > > > > > > > be the partition key for the changelog topic?
    > > > > > > >
    > > > > > > > Note that we use low-level processor API and don't commit
    > > > ourselves.
    > > > > > > >
    > > > > > > > Regards,
    > > > > > > > Mangat
    > > > > > > >
    > > > > > > >
    > > > > > > >
    > > > > > > >
    > > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <
    > wangguoz@gmail.com
    > > >
    > > > > > wrote:
    > > > > > > >
    > > > > > > > > Hi Mangat,
    > > > > > > > >
    > > > > > > > > Please see my replies inline below.
    > > > > > > > >
    > > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
    > > mangatmodi@gmail.com>
    > > > > > > wrote:
    > > > > > > > >
    > > > > > > > > > @Guozhang Wang
    > > > > > > > > >
    > > > > > > > > > Thanks for the reply.  Indeed I am finding it difficult to
    > > > > explain
    > > > > > > this
    > > > > > > > > > state. I checked the code many times. There can be a bug
    > but
    > > I
    > > > > fail
    > > > > > > to
    > > > > > > > > see
    > > > > > > > > > it. There are several things about the Kafka streams that I
    > > > don't
    > > > > > > > > > understand, which makes it harder for me to reason.
    > > > > > > > > >
    > > > > > > > > > 1. What is the partition key for the changelog topics? Is
    > it
    > > > the
    > > > > > same
    > > > > > > > as
    > > > > > > > > > the Input key or the state store key? Or maybe the thread
    > > > > specifies
    > > > > > > the
    > > > > > > > > > partition as it knows the input partition it is subscribed
    > > to?
    > > > If
    > > > > > the
    > > > > > > > > input
    > > > > > > > > > topic and state store are differently partitioned then we
    > can
    > > > > > explain
    > > > > > > > the
    > > > > > > > > > issue here.
    > > > > > > > > >
    > > > > > > > >
    > > > > > > > > In Kafka Stream's changelog, the "partition key" of Kafka
    > > > messages
    > > > > is
    > > > > > > the
    > > > > > > > > same as the "message key" itself. And the message key is the
    > > same
    > > > > as
    > > > > > > the
    > > > > > > > > state store key.
    > > > > > > > >
    > > > > > > > > Since the state store here should be storing the running
    > > > aggregate,
    > > > > > it
    > > > > > > > > means that the partition key is the same as the aggregated
    > key.
    > > > > > > > >
    > > > > > > > > If you are doing a group-by aggregation here, where the
    > > group-by
    > > > > keys
    > > > > > > are
    > > > > > > > > different from the source input topic's keys, hence the state
    > > > store
    > > > > > > keys
    > > > > > > > > would be different with the input topic keys.
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > > 2. Is there a background thread to persist in the state
    > store
    > > > > when
    > > > > > > > > caching
    > > > > > > > > > is disabled? When will the app commit the log for the input
    > > > > topic?
    > > > > > Is
    > > > > > > > it
    > > > > > > > > > when sink writes into the output topic or when the state
    > > store
    > > > > > writes
    > > > > > > > > into
    > > > > > > > > > the changelog topic? Because, if the app commits the record
    > > > > before
    > > > > > > the
    > > > > > > > > data
    > > > > > > > > > was written to changelog topic then we can again explain
    > this
    > > > > state
    > > > > > > > > >
    > > > > > > > > > The commit happens *after* the local state store, as well
    > as
    > > > the
    > > > > > > > > changelog records sent by the Streams' producers, have been
    > > > > flushed.
    > > > > > > I.e.
    > > > > > > > > if there's a failure in between, you would re-process some
    > > source
    > > > > > > records
    > > > > > > > > and hence cause duplicates, but no data loss (a.k.a. the
    > > > > > at_least_once
    > > > > > > > > semantics).
    > > > > > > > >
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > > >You may also consider upgrading to 2.6.x or higher version
    > > and
    > > > > see
    > > > > > > if
    > > > > > > > > this
    > > > > > > > > > issue goes away.
    > > > > > > > > > Do you mean the client or the Kafka broker? I will be
    > > upgrading
    > > > > the
    > > > > > > > > client
    > > > > > > > > > to 2.7.0 soon.
    > > > > > > > > >
    > > > > > > > > > I meant the client.
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > > Sadly looking into the timestamp will not help much as we
    > use
    > > > > some
    > > > > > > > > business
    > > > > > > > > > time field to set the record timestamp. If I am right,
    > there
    > > is
    > > > > no
    > > > > > > way
    > > > > > > > > now
    > > > > > > > > > to know that when a Producer wrote a record in a Kafka
    > topic.
    > > > > > > > > >
    > > > > > > > > > Regards,
    > > > > > > > > > Mangat
    > > > > > > > > >
    > > > > > > > > >
    > > > > > > > > >
    > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
    > > > wangguoz@gmail.com
    > > > > >
    > > > > > > > wrote:
    > > > > > > > > >
    > > > > > > > > > > Hello Mangat,
    > > > > > > > > > >
    > > > > > > > > > > With at least once, although some records maybe processed
    > > > > > multiple
    > > > > > > > > times
    > > > > > > > > > > their process ordering should not be violated, so what
    > you
    > > > > > observed
    > > > > > > > is
    > > > > > > > > > not
    > > > > > > > > > > expected. What caught my eyes are this section in your
    > > output
    > > > > > > > > changelogs
    > > > > > > > > > > (high-lighted):
    > > > > > > > > > >
    > > > > > > > > > > Key1, V1
    > > > > > > > > > > Key1, null
    > > > > > > > > > > Key1, V1
    > > > > > > > > > > Key1, null  (processed again)
    > > > > > > > > > > Key1, V2
    > > > > > > > > > > Key1, null
    > > > > > > > > > >
    > > > > > > > > > > *Key1, V1Key1,V2*
    > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet
    > but
    > > > > > > > reprocessed
    > > > > > > > > > V1
    > > > > > > > > > > again due to reassignment)
    > > > > > > > > > >
    > > > > > > > > > > They seem to be the result of first receiving a tombstone
    > > > which
    > > > > > > > removes
    > > > > > > > > > V1
    > > > > > > > > > > and then a new record that adds V2. However, since
    > caching
    > > is
    > > > > > > > disabled
    > > > > > > > > > you
    > > > > > > > > > > should get
    > > > > > > > > > >
    > > > > > > > > > > *Key1,V1*
    > > > > > > > > > > *Key1,null*
    > > > > > > > > > > *Key1,V2*
    > > > > > > > > > >
    > > > > > > > > > > instead; without the actual code snippet I cannot tell
    > more
    > > > > > what's
    > > > > > > > > > > happening here. If you can look into the logs you can
    > > record
    > > > > each
    > > > > > > > time
    > > > > > > > > > when
    > > > > > > > > > > partition migrates, how many records from the changelog
    > was
    > > > > > > replayed
    > > > > > > > to
    > > > > > > > > > > restore the store, and from which offset on the input
    > topic
    > > > > does
    > > > > > > > > Streams
    > > > > > > > > > > resume processing. You may also consider upgrading to
    > 2.6.x
    > > > or
    > > > > > > higher
    > > > > > > > > > > version and see if this issue goes away.
    > > > > > > > > > >
    > > > > > > > > > >
    > > > > > > > > > > Guozhang
    > > > > > > > > > >
    > > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
    > > > > mangatmodi@gmail.com>
    > > > > > > > > wrote:
    > > > > > > > > > >
    > > > > > > > > > > > Hey,
    > > > > > > > > > > >
    > > > > > > > > > > > We have the following setup in our infrastructure.
    > > > > > > > > > > >
    > > > > > > > > > > >    1. Kafka - 2.5.1
    > > > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka` version
    > > > 2.5.1
    > > > > > > > library
    > > > > > > > > > > >    3. Low level processor API is used with
    > *atleast-once*
    > > > > > > semantics
    > > > > > > > > > > >    4. State stores are *in-memory* with *caching
    > > disabled*
    > > > > and
    > > > > > > > > > *changelog
    > > > > > > > > > > >    enabled*
    > > > > > > > > > > >
    > > > > > > > > > > >
    > > > > > > > > > > > Is it possible that during state replication and
    > > partition
    > > > > > > > > > reassignment,
    > > > > > > > > > > > the input data is not always applied to the state
    > store?
    > > > > > > > > > > >
    > > > > > > > > > > > 1. Let's say the input topic is having records like
    > > > following
    > > > > > > > > > > >
    > > > > > > > > > > > ```
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null (tombstone)
    > > > > > > > > > > > Key1, V2
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V3
    > > > > > > > > > > > Key1, V4
    > > > > > > > > > > > ```
    > > > > > > > > > > > 2. The app has an aggregation function which takes
    > these
    > > > > record
    > > > > > > and
    > > > > > > > > > > update
    > > > > > > > > > > > the state store so that changelog shall be
    > > > > > > > > > > >
    > > > > > > > > > > > ```
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null (tombstone)
    > > > > > > > > > > > Key1, V2
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V3
    > > > > > > > > > > > Key1, V3 + V4
    > > > > > > > > > > > ```
    > > > > > > > > > > > Let's say the partition responsible for processing the
    > > > above
    > > > > > key
    > > > > > > > was
    > > > > > > > > > > > several times reallocated to different threads due to
    > > some
    > > > > > infra
    > > > > > > > > issues
    > > > > > > > > > > we
    > > > > > > > > > > > are having(in Kubernetes where we run the app, not the
    > > > Kafka
    > > > > > > > > cluster).
    > > > > > > > > > > >
    > > > > > > > > > > > I see the following record in the changelogs
    > > > > > > > > > > >
    > > > > > > > > > > > ```
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null  (processed again)
    > > > > > > > > > > > Key1, V2
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1,V2
    > > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet
    > > but
    > > > > > > > > reprocessed
    > > > > > > > > > > V1
    > > > > > > > > > > > again due to reassignment)
    > > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then
    > V1
    > > > > > > tombstone
    > > > > > > > > > > should
    > > > > > > > > > > > have been applied also!!)
    > > > > > > > > > > > Key1, V2+V1 (it is back!!!)
    > > > > > > > > > > > Key1,V1
    > > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
    > > > > > > > > > > > ```
    > > > > > > > > > > >
    > > > > > > > > > > > If you see this means several things
    > > > > > > > > > > > 1. The state is always correctly applied locally (in
    > > > > developer
    > > > > > > > > laptop),
    > > > > > > > > > > > where there were no reassignments.
    > > > > > > > > > > > 2. The records are processed multiple times, which is
    > > > > > > > understandable
    > > > > > > > > as
    > > > > > > > > > > we
    > > > > > > > > > > > have at least symantics here.
    > > > > > > > > > > > 3. As long as we re-apply the same events in the same
    > > > orders
    > > > > we
    > > > > > > are
    > > > > > > > > > > golden
    > > > > > > > > > > > but looks like some records are skipped, but here it
    > > looks
    > > > as
    > > > > > if
    > > > > > > we
    > > > > > > > > > have
    > > > > > > > > > > > multiple consumers reading and update the same topics,
    > > > > leading
    > > > > > to
    > > > > > > > > race
    > > > > > > > > > > > conditions.
    > > > > > > > > > > >
    > > > > > > > > > > > Is there any way, Kafka streams' state replication
    > could
    > > > lead
    > > > > > to
    > > > > > > > > such a
    > > > > > > > > > > > race condition?
    > > > > > > > > > > >
    > > > > > > > > > > > Regards,
    > > > > > > > > > > > Mangat
    > > > > > > > > > > >
    > > > > > > > > > >
    > > > > > > > > > >
    > > > > > > > > > > --
    > > > > > > > > > > -- Guozhang
    > > > > > > > > > >
    > > > > > > > > >
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > --
    > > > > > > > > -- Guozhang
    > > > > > > > >
    > > > > > > >
    > > > > > >
    > > > > > >
    > > > > > > --
    > > > > > > -- Guozhang
    > > > > > >
    > > > > >
    > > > >
    > > > >
    > > > > --
    > > > > -- Guozhang
    > > > >
    > > >
    > >
    > >
    > > --
    > > -- Guozhang
    > >
    >
    
    
    -- 
    -- Guozhang
    


Re: Kafka Stream: State replication seems unpredictable.

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Mangat,

I think using persistent store that relies on in-memory stores could help
if the threads are from the same instance.

Guozhang

On Tue, Apr 20, 2021 at 12:54 AM mangat rai <ma...@gmail.com> wrote:

> Hey Guozhang,
>
> Thanks for creating the issue. Yes, you are right, this will happen only
> with the consecutive rebalancing as after some time zombie thread will stop
> and re-join the group and the new thread will always overwrite the state
> with the latest data. In our poor infra setup, the rebalancing was
> happening many times in a row.
>
> Now, we can't guarantee that the consecutive rebalancing will not happen
> again (we reduced fetch-size which fixed it in many ways), will any of the
> following work as a workaround?
>
> 1. Use persistent store instead of in-memory. The new thread will never get
> the lock hence we will lose availability but keep things consistent.
> 2. Use exactly-once semantics. However, we might need to redesign our apps.
> It's a bigger change.
>
> Regards,
> Mangat
>
> On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Mangat,
> >
> > What you've encountered is a "zombie writer" issue, that is, Thread-2 did
> > not know there's already a new rebalance and hence its partitions have
> been
> > migrated out, until it tries to commit and then got notified of the
> > illegal-generation error and realize itself is the "zombie" already. This
> > case would still persist even with incremental rebalancing.
> >
> > I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to
> summarize
> > the situation. Please LMK if that explanation is clear to you.
> >
> > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com>
> wrote:
> >
> > > Thanks, Guozhang,
> > >
> > > I was knocking myself with Kafka's various consumer rebalancing
> > algorithms
> > > in the last 2 days. Could I generalize this problem as
> > >
> > >
> > >
> > > *Any in-memory state store backed by a changelog topic will always risk
> > > having interleaved writes from two different writers during
> rebalancing?*
> > > In our case, CPU throttling made it worse as thread-2 didn't try to
> > commit
> > > for a long time. Also,
> > >
> > > 1. Do you think if we disable the incremental rebalancing, we will not
> > have
> > > this issue because If I understood correctly Thread-4 will not start
> > > processing until the state is completely transferred from Thread-2.
> > > 2. If yes, how can we disable it without downgrading the client?
> > >
> > > Since we have a very low scale and no real-time computing requirement,
> we
> > > will be happy to sacrifice the availability to have consistency.
> > >
> > > Regards,
> > > Mangat
> > >
> > >
> > >
> > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hi Mangat:
> > > >
> > > > I think I found the issue of your problem here.
> > > >
> > > > It seems thread-2's partition was assigned to thread-4 while thread-2
> > was
> > > > not aware (because it missed a rebalance, this is normal scenario);
> in
> > > > other words, thread2 becomes a "zombie". It would stay in that zombie
> > > state
> > > > until it tried to commit, in which it would get an error from the
> > brokers
> > > > and realize its zombie identity and re-joins the group.
> > > >
> > > > During that period of time, before the commit was issued, it would
> > > continue
> > > > trying to write to its local states; here are several scenarios:
> > > >
> > > > 1) if thread-2/4 are belonging to two different nodes then that is
> > fine,
> > > > since they will write to different local state stores.
> > > > 2) if they belong to the same nodes, and
> > > >    a) the state stores are persistent then they would have risks of
> > > > contention; this is guarded by the state directory locks (as file
> > locks)
> > > in
> > > > which case the new owner thread-4 should not be able to get on the
> > local
> > > > state files.
> > > >    b) the state stores are in-memory, in which case that is fine
> since
> > > the
> > > > in-memory stores are kept separate as well.
> > > >
> > > > In your case: 2.b), the issue is that the changelog would still be
> > shared
> > > > between the two --- but note that this is the same case as in case 1)
> > as
> > > > well. And this means at that time the changelog is shared by two
> > writers
> > > > sending records interleaving. And if there’s a tombstone that was
> > > intended
> > > > for a record A, but when it was written interleaving and there’s
> > another
> > > > record B in between, that tombstone would effectively delete record
> B.
> > > The
> > > > key here is that, when we replay the changelogs, we replay it
> > completely
> > > > following offset ordering.
> > > >
> > > >
> > > >
> > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <ma...@gmail.com>
> > wrote:
> > > >
> > > > > Guozhang,
> > > > >
> > > > > Yes, you are correct. We have our own group processor. I have more
> > > > > information now.
> > > > >
> > > > > 1. I added ThreadId in the data when the app persists into the
> > > changelog
> > > > > topic.
> > > > > 2. Thread-2 which was working with partition-0 had a timeout issue.
> > > > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
> > > > > changelog.
> > > > > 5. *But then Thread-2 and Thread-4 both were writing into the
> > > partition-0
> > > > > of the changelog, that too for the same key.*
> > > > >
> > > > > So I was clearly able to see that two threads were overwriting data
> > of
> > > > one
> > > > > another into the state store leading to a corrupted state. This
> > > confirms
> > > > my
> > > > > theory that it was an issue of concurrent update. This was
> something
> > > > > totally unexpected. I suspect that Thread-2 continues to persist
> its
> > > > > in-memory state, maybe because It wasn't stopped after the timeout
> > > > > exception. Is there a configuration possible in the Kafka stream
> > which
> > > > > could lead to this?
> > > > >
> > > > > There was no network issue, our CPU was highly throttled by
> > Kubernetes.
> > > > We
> > > > > gave more resources, also decreased the fetch-size so we have more
> > I/O
> > > to
> > > > > Cpu time ratio than before, and then there was no timeout issue,
> > hence
> > > no
> > > > > reassignment and hence no corrupted state.
> > > > >
> > > > > I really appreciate your help here...
> > > > > Thanks!
> > > > > Mangat
> > > > >
> > > > >
> > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hey Mangat,
> > > > > >
> > > > > > A. With at least once, Streams does not make sure atomicity of
> 1) /
> > > 2);
> > > > > > with exactly once, atomicity is indeed guaranteed with
> > transactional
> > > > > > messaging.
> > > > > >
> > > > > > B. If you are using processor API, then I'm assuming you did your
> > own
> > > > > > group-by processor right? In that case, the partition key would
> > just
> > > be
> > > > > the
> > > > > > record key when you are sending to the repartition topic.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Thanks again, that makes things clear. I still have some
> > questions
> > > > here
> > > > > > > then.
> > > > > > >
> > > > > > > A.  For each record we read, we do two updates
> > > > > > >       1. Changelog topic of the state store.
> > > > > > >       2. Output topic aka sink.
> > > > > > >       Does the Kafka stream app make sure that either both are
> > > > > committed
> > > > > > or
> > > > > > > neither?
> > > > > > >
> > > > > > > B.  Out Input topic actually has the as (a,b,c), but we
> partition
> > > > with
> > > > > > only
> > > > > > > (a). We do this because we have different compaction
> requirements
> > > > than
> > > > > > the
> > > > > > > partitions. It will still work as all (a,b,c) records will go
> to
> > > the
> > > > > same
> > > > > > > partition. Now in aggregation, we group by (a,b,c). In such
> case
> > > what
> > > > > > will
> > > > > > > be the partition key for the changelog topic?
> > > > > > >
> > > > > > > Note that we use low-level processor API and don't commit
> > > ourselves.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Mangat
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Mangat,
> > > > > > > >
> > > > > > > > Please see my replies inline below.
> > > > > > > >
> > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
> > mangatmodi@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > @Guozhang Wang
> > > > > > > > >
> > > > > > > > > Thanks for the reply.  Indeed I am finding it difficult to
> > > > explain
> > > > > > this
> > > > > > > > > state. I checked the code many times. There can be a bug
> but
> > I
> > > > fail
> > > > > > to
> > > > > > > > see
> > > > > > > > > it. There are several things about the Kafka streams that I
> > > don't
> > > > > > > > > understand, which makes it harder for me to reason.
> > > > > > > > >
> > > > > > > > > 1. What is the partition key for the changelog topics? Is
> it
> > > the
> > > > > same
> > > > > > > as
> > > > > > > > > the Input key or the state store key? Or maybe the thread
> > > > specifies
> > > > > > the
> > > > > > > > > partition as it knows the input partition it is subscribed
> > to?
> > > If
> > > > > the
> > > > > > > > input
> > > > > > > > > topic and state store are differently partitioned then we
> can
> > > > > explain
> > > > > > > the
> > > > > > > > > issue here.
> > > > > > > > >
> > > > > > > >
> > > > > > > > In Kafka Stream's changelog, the "partition key" of Kafka
> > > messages
> > > > is
> > > > > > the
> > > > > > > > same as the "message key" itself. And the message key is the
> > same
> > > > as
> > > > > > the
> > > > > > > > state store key.
> > > > > > > >
> > > > > > > > Since the state store here should be storing the running
> > > aggregate,
> > > > > it
> > > > > > > > means that the partition key is the same as the aggregated
> key.
> > > > > > > >
> > > > > > > > If you are doing a group-by aggregation here, where the
> > group-by
> > > > keys
> > > > > > are
> > > > > > > > different from the source input topic's keys, hence the state
> > > store
> > > > > > keys
> > > > > > > > would be different with the input topic keys.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 2. Is there a background thread to persist in the state
> store
> > > > when
> > > > > > > > caching
> > > > > > > > > is disabled? When will the app commit the log for the input
> > > > topic?
> > > > > Is
> > > > > > > it
> > > > > > > > > when sink writes into the output topic or when the state
> > store
> > > > > writes
> > > > > > > > into
> > > > > > > > > the changelog topic? Because, if the app commits the record
> > > > before
> > > > > > the
> > > > > > > > data
> > > > > > > > > was written to changelog topic then we can again explain
> this
> > > > state
> > > > > > > > >
> > > > > > > > > The commit happens *after* the local state store, as well
> as
> > > the
> > > > > > > > changelog records sent by the Streams' producers, have been
> > > > flushed.
> > > > > > I.e.
> > > > > > > > if there's a failure in between, you would re-process some
> > source
> > > > > > records
> > > > > > > > and hence cause duplicates, but no data loss (a.k.a. the
> > > > > at_least_once
> > > > > > > > semantics).
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > > >You may also consider upgrading to 2.6.x or higher version
> > and
> > > > see
> > > > > > if
> > > > > > > > this
> > > > > > > > > issue goes away.
> > > > > > > > > Do you mean the client or the Kafka broker? I will be
> > upgrading
> > > > the
> > > > > > > > client
> > > > > > > > > to 2.7.0 soon.
> > > > > > > > >
> > > > > > > > > I meant the client.
> > > > > > > >
> > > > > > > >
> > > > > > > > > Sadly looking into the timestamp will not help much as we
> use
> > > > some
> > > > > > > > business
> > > > > > > > > time field to set the record timestamp. If I am right,
> there
> > is
> > > > no
> > > > > > way
> > > > > > > > now
> > > > > > > > > to know that when a Producer wrote a record in a Kafka
> topic.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Mangat
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
> > > wangguoz@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello Mangat,
> > > > > > > > > >
> > > > > > > > > > With at least once, although some records maybe processed
> > > > > multiple
> > > > > > > > times
> > > > > > > > > > their process ordering should not be violated, so what
> you
> > > > > observed
> > > > > > > is
> > > > > > > > > not
> > > > > > > > > > expected. What caught my eyes are this section in your
> > output
> > > > > > > > changelogs
> > > > > > > > > > (high-lighted):
> > > > > > > > > >
> > > > > > > > > > Key1, V1
> > > > > > > > > > Key1, null
> > > > > > > > > > Key1, V1
> > > > > > > > > > Key1, null  (processed again)
> > > > > > > > > > Key1, V2
> > > > > > > > > > Key1, null
> > > > > > > > > >
> > > > > > > > > > *Key1, V1Key1,V2*
> > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet
> but
> > > > > > > reprocessed
> > > > > > > > > V1
> > > > > > > > > > again due to reassignment)
> > > > > > > > > >
> > > > > > > > > > They seem to be the result of first receiving a tombstone
> > > which
> > > > > > > removes
> > > > > > > > > V1
> > > > > > > > > > and then a new record that adds V2. However, since
> caching
> > is
> > > > > > > disabled
> > > > > > > > > you
> > > > > > > > > > should get
> > > > > > > > > >
> > > > > > > > > > *Key1,V1*
> > > > > > > > > > *Key1,null*
> > > > > > > > > > *Key1,V2*
> > > > > > > > > >
> > > > > > > > > > instead; without the actual code snippet I cannot tell
> more
> > > > > what's
> > > > > > > > > > happening here. If you can look into the logs you can
> > record
> > > > each
> > > > > > > time
> > > > > > > > > when
> > > > > > > > > > partition migrates, how many records from the changelog
> was
> > > > > > replayed
> > > > > > > to
> > > > > > > > > > restore the store, and from which offset on the input
> topic
> > > > does
> > > > > > > > Streams
> > > > > > > > > > resume processing. You may also consider upgrading to
> 2.6.x
> > > or
> > > > > > higher
> > > > > > > > > > version and see if this issue goes away.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
> > > > mangatmodi@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey,
> > > > > > > > > > >
> > > > > > > > > > > We have the following setup in our infrastructure.
> > > > > > > > > > >
> > > > > > > > > > >    1. Kafka - 2.5.1
> > > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka` version
> > > 2.5.1
> > > > > > > library
> > > > > > > > > > >    3. Low level processor API is used with
> *atleast-once*
> > > > > > semantics
> > > > > > > > > > >    4. State stores are *in-memory* with *caching
> > disabled*
> > > > and
> > > > > > > > > *changelog
> > > > > > > > > > >    enabled*
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Is it possible that during state replication and
> > partition
> > > > > > > > > reassignment,
> > > > > > > > > > > the input data is not always applied to the state
> store?
> > > > > > > > > > >
> > > > > > > > > > > 1. Let's say the input topic is having records like
> > > following
> > > > > > > > > > >
> > > > > > > > > > > ```
> > > > > > > > > > > Key1, V1
> > > > > > > > > > > Key1, null (tombstone)
> > > > > > > > > > > Key1, V2
> > > > > > > > > > > Key1, null
> > > > > > > > > > > Key1, V3
> > > > > > > > > > > Key1, V4
> > > > > > > > > > > ```
> > > > > > > > > > > 2. The app has an aggregation function which takes
> these
> > > > record
> > > > > > and
> > > > > > > > > > update
> > > > > > > > > > > the state store so that changelog shall be
> > > > > > > > > > >
> > > > > > > > > > > ```
> > > > > > > > > > > Key1, V1
> > > > > > > > > > > Key1, null (tombstone)
> > > > > > > > > > > Key1, V2
> > > > > > > > > > > Key1, null
> > > > > > > > > > > Key1, V3
> > > > > > > > > > > Key1, V3 + V4
> > > > > > > > > > > ```
> > > > > > > > > > > Let's say the partition responsible for processing the
> > > above
> > > > > key
> > > > > > > was
> > > > > > > > > > > several times reallocated to different threads due to
> > some
> > > > > infra
> > > > > > > > issues
> > > > > > > > > > we
> > > > > > > > > > > are having(in Kubernetes where we run the app, not the
> > > Kafka
> > > > > > > > cluster).
> > > > > > > > > > >
> > > > > > > > > > > I see the following record in the changelogs
> > > > > > > > > > >
> > > > > > > > > > > ```
> > > > > > > > > > > Key1, V1
> > > > > > > > > > > Key1, null
> > > > > > > > > > > Key1, V1
> > > > > > > > > > > Key1, null  (processed again)
> > > > > > > > > > > Key1, V2
> > > > > > > > > > > Key1, null
> > > > > > > > > > > Key1, V1
> > > > > > > > > > > Key1,V2
> > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet
> > but
> > > > > > > > reprocessed
> > > > > > > > > > V1
> > > > > > > > > > > again due to reassignment)
> > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then
> V1
> > > > > > tombstone
> > > > > > > > > > should
> > > > > > > > > > > have been applied also!!)
> > > > > > > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > > > > > > Key1,V1
> > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > If you see this means several things
> > > > > > > > > > > 1. The state is always correctly applied locally (in
> > > > developer
> > > > > > > > laptop),
> > > > > > > > > > > where there were no reassignments.
> > > > > > > > > > > 2. The records are processed multiple times, which is
> > > > > > > understandable
> > > > > > > > as
> > > > > > > > > > we
> > > > > > > > > > > have at least symantics here.
> > > > > > > > > > > 3. As long as we re-apply the same events in the same
> > > orders
> > > > we
> > > > > > are
> > > > > > > > > > golden
> > > > > > > > > > > but looks like some records are skipped, but here it
> > looks
> > > as
> > > > > if
> > > > > > we
> > > > > > > > > have
> > > > > > > > > > > multiple consumers reading and update the same topics,
> > > > leading
> > > > > to
> > > > > > > > race
> > > > > > > > > > > conditions.
> > > > > > > > > > >
> > > > > > > > > > > Is there any way, Kafka streams' state replication
> could
> > > lead
> > > > > to
> > > > > > > > such a
> > > > > > > > > > > race condition?
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Mangat
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Kafka Stream: State replication seems unpredictable.

Posted by mangat rai <ma...@gmail.com>.
Hey Guozhang,

Thanks for creating the issue. Yes, you are right, this will happen only
with the consecutive rebalancing as after some time zombie thread will stop
and re-join the group and the new thread will always overwrite the state
with the latest data. In our poor infra setup, the rebalancing was
happening many times in a row.

Now, we can't guarantee that the consecutive rebalancing will not happen
again (we reduced fetch-size which fixed it in many ways), will any of the
following work as a workaround?

1. Use persistent store instead of in-memory. The new thread will never get
the lock hence we will lose availability but keep things consistent.
2. Use exactly-once semantics. However, we might need to redesign our apps.
It's a bigger change.

Regards,
Mangat

On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Mangat,
>
> What you've encountered is a "zombie writer" issue, that is, Thread-2 did
> not know there's already a new rebalance and hence its partitions have been
> migrated out, until it tries to commit and then got notified of the
> illegal-generation error and realize itself is the "zombie" already. This
> case would still persist even with incremental rebalancing.
>
> I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to summarize
> the situation. Please LMK if that explanation is clear to you.
>
> On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com> wrote:
>
> > Thanks, Guozhang,
> >
> > I was knocking myself with Kafka's various consumer rebalancing
> algorithms
> > in the last 2 days. Could I generalize this problem as
> >
> >
> >
> > *Any in-memory state store backed by a changelog topic will always risk
> > having interleaved writes from two different writers during rebalancing?*
> > In our case, CPU throttling made it worse as thread-2 didn't try to
> commit
> > for a long time. Also,
> >
> > 1. Do you think if we disable the incremental rebalancing, we will not
> have
> > this issue because If I understood correctly Thread-4 will not start
> > processing until the state is completely transferred from Thread-2.
> > 2. If yes, how can we disable it without downgrading the client?
> >
> > Since we have a very low scale and no real-time computing requirement, we
> > will be happy to sacrifice the availability to have consistency.
> >
> > Regards,
> > Mangat
> >
> >
> >
> > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hi Mangat:
> > >
> > > I think I found the issue of your problem here.
> > >
> > > It seems thread-2's partition was assigned to thread-4 while thread-2
> was
> > > not aware (because it missed a rebalance, this is normal scenario); in
> > > other words, thread2 becomes a "zombie". It would stay in that zombie
> > state
> > > until it tried to commit, in which it would get an error from the
> brokers
> > > and realize its zombie identity and re-joins the group.
> > >
> > > During that period of time, before the commit was issued, it would
> > continue
> > > trying to write to its local states; here are several scenarios:
> > >
> > > 1) if thread-2/4 are belonging to two different nodes then that is
> fine,
> > > since they will write to different local state stores.
> > > 2) if they belong to the same nodes, and
> > >    a) the state stores are persistent then they would have risks of
> > > contention; this is guarded by the state directory locks (as file
> locks)
> > in
> > > which case the new owner thread-4 should not be able to get on the
> local
> > > state files.
> > >    b) the state stores are in-memory, in which case that is fine since
> > the
> > > in-memory stores are kept separate as well.
> > >
> > > In your case: 2.b), the issue is that the changelog would still be
> shared
> > > between the two --- but note that this is the same case as in case 1)
> as
> > > well. And this means at that time the changelog is shared by two
> writers
> > > sending records interleaving. And if there’s a tombstone that was
> > intended
> > > for a record A, but when it was written interleaving and there’s
> another
> > > record B in between, that tombstone would effectively delete record B.
> > The
> > > key here is that, when we replay the changelogs, we replay it
> completely
> > > following offset ordering.
> > >
> > >
> > >
> > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <ma...@gmail.com>
> wrote:
> > >
> > > > Guozhang,
> > > >
> > > > Yes, you are correct. We have our own group processor. I have more
> > > > information now.
> > > >
> > > > 1. I added ThreadId in the data when the app persists into the
> > changelog
> > > > topic.
> > > > 2. Thread-2 which was working with partition-0 had a timeout issue.
> > > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
> > > > changelog.
> > > > 5. *But then Thread-2 and Thread-4 both were writing into the
> > partition-0
> > > > of the changelog, that too for the same key.*
> > > >
> > > > So I was clearly able to see that two threads were overwriting data
> of
> > > one
> > > > another into the state store leading to a corrupted state. This
> > confirms
> > > my
> > > > theory that it was an issue of concurrent update. This was something
> > > > totally unexpected. I suspect that Thread-2 continues to persist its
> > > > in-memory state, maybe because It wasn't stopped after the timeout
> > > > exception. Is there a configuration possible in the Kafka stream
> which
> > > > could lead to this?
> > > >
> > > > There was no network issue, our CPU was highly throttled by
> Kubernetes.
> > > We
> > > > gave more resources, also decreased the fetch-size so we have more
> I/O
> > to
> > > > Cpu time ratio than before, and then there was no timeout issue,
> hence
> > no
> > > > reassignment and hence no corrupted state.
> > > >
> > > > I really appreciate your help here...
> > > > Thanks!
> > > > Mangat
> > > >
> > > >
> > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hey Mangat,
> > > > >
> > > > > A. With at least once, Streams does not make sure atomicity of 1) /
> > 2);
> > > > > with exactly once, atomicity is indeed guaranteed with
> transactional
> > > > > messaging.
> > > > >
> > > > > B. If you are using processor API, then I'm assuming you did your
> own
> > > > > group-by processor right? In that case, the partition key would
> just
> > be
> > > > the
> > > > > record key when you are sending to the repartition topic.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks again, that makes things clear. I still have some
> questions
> > > here
> > > > > > then.
> > > > > >
> > > > > > A.  For each record we read, we do two updates
> > > > > >       1. Changelog topic of the state store.
> > > > > >       2. Output topic aka sink.
> > > > > >       Does the Kafka stream app make sure that either both are
> > > > committed
> > > > > or
> > > > > > neither?
> > > > > >
> > > > > > B.  Out Input topic actually has the as (a,b,c), but we partition
> > > with
> > > > > only
> > > > > > (a). We do this because we have different compaction requirements
> > > than
> > > > > the
> > > > > > partitions. It will still work as all (a,b,c) records will go to
> > the
> > > > same
> > > > > > partition. Now in aggregation, we group by (a,b,c). In such case
> > what
> > > > > will
> > > > > > be the partition key for the changelog topic?
> > > > > >
> > > > > > Note that we use low-level processor API and don't commit
> > ourselves.
> > > > > >
> > > > > > Regards,
> > > > > > Mangat
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Mangat,
> > > > > > >
> > > > > > > Please see my replies inline below.
> > > > > > >
> > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
> mangatmodi@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > @Guozhang Wang
> > > > > > > >
> > > > > > > > Thanks for the reply.  Indeed I am finding it difficult to
> > > explain
> > > > > this
> > > > > > > > state. I checked the code many times. There can be a bug but
> I
> > > fail
> > > > > to
> > > > > > > see
> > > > > > > > it. There are several things about the Kafka streams that I
> > don't
> > > > > > > > understand, which makes it harder for me to reason.
> > > > > > > >
> > > > > > > > 1. What is the partition key for the changelog topics? Is it
> > the
> > > > same
> > > > > > as
> > > > > > > > the Input key or the state store key? Or maybe the thread
> > > specifies
> > > > > the
> > > > > > > > partition as it knows the input partition it is subscribed
> to?
> > If
> > > > the
> > > > > > > input
> > > > > > > > topic and state store are differently partitioned then we can
> > > > explain
> > > > > > the
> > > > > > > > issue here.
> > > > > > > >
> > > > > > >
> > > > > > > In Kafka Stream's changelog, the "partition key" of Kafka
> > messages
> > > is
> > > > > the
> > > > > > > same as the "message key" itself. And the message key is the
> same
> > > as
> > > > > the
> > > > > > > state store key.
> > > > > > >
> > > > > > > Since the state store here should be storing the running
> > aggregate,
> > > > it
> > > > > > > means that the partition key is the same as the aggregated key.
> > > > > > >
> > > > > > > If you are doing a group-by aggregation here, where the
> group-by
> > > keys
> > > > > are
> > > > > > > different from the source input topic's keys, hence the state
> > store
> > > > > keys
> > > > > > > would be different with the input topic keys.
> > > > > > >
> > > > > > >
> > > > > > > > 2. Is there a background thread to persist in the state store
> > > when
> > > > > > > caching
> > > > > > > > is disabled? When will the app commit the log for the input
> > > topic?
> > > > Is
> > > > > > it
> > > > > > > > when sink writes into the output topic or when the state
> store
> > > > writes
> > > > > > > into
> > > > > > > > the changelog topic? Because, if the app commits the record
> > > before
> > > > > the
> > > > > > > data
> > > > > > > > was written to changelog topic then we can again explain this
> > > state
> > > > > > > >
> > > > > > > > The commit happens *after* the local state store, as well as
> > the
> > > > > > > changelog records sent by the Streams' producers, have been
> > > flushed.
> > > > > I.e.
> > > > > > > if there's a failure in between, you would re-process some
> source
> > > > > records
> > > > > > > and hence cause duplicates, but no data loss (a.k.a. the
> > > > at_least_once
> > > > > > > semantics).
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > >You may also consider upgrading to 2.6.x or higher version
> and
> > > see
> > > > > if
> > > > > > > this
> > > > > > > > issue goes away.
> > > > > > > > Do you mean the client or the Kafka broker? I will be
> upgrading
> > > the
> > > > > > > client
> > > > > > > > to 2.7.0 soon.
> > > > > > > >
> > > > > > > > I meant the client.
> > > > > > >
> > > > > > >
> > > > > > > > Sadly looking into the timestamp will not help much as we use
> > > some
> > > > > > > business
> > > > > > > > time field to set the record timestamp. If I am right, there
> is
> > > no
> > > > > way
> > > > > > > now
> > > > > > > > to know that when a Producer wrote a record in a Kafka topic.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Mangat
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello Mangat,
> > > > > > > > >
> > > > > > > > > With at least once, although some records maybe processed
> > > > multiple
> > > > > > > times
> > > > > > > > > their process ordering should not be violated, so what you
> > > > observed
> > > > > > is
> > > > > > > > not
> > > > > > > > > expected. What caught my eyes are this section in your
> output
> > > > > > > changelogs
> > > > > > > > > (high-lighted):
> > > > > > > > >
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null  (processed again)
> > > > > > > > > Key1, V2
> > > > > > > > > Key1, null
> > > > > > > > >
> > > > > > > > > *Key1, V1Key1,V2*
> > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > > > reprocessed
> > > > > > > > V1
> > > > > > > > > again due to reassignment)
> > > > > > > > >
> > > > > > > > > They seem to be the result of first receiving a tombstone
> > which
> > > > > > removes
> > > > > > > > V1
> > > > > > > > > and then a new record that adds V2. However, since caching
> is
> > > > > > disabled
> > > > > > > > you
> > > > > > > > > should get
> > > > > > > > >
> > > > > > > > > *Key1,V1*
> > > > > > > > > *Key1,null*
> > > > > > > > > *Key1,V2*
> > > > > > > > >
> > > > > > > > > instead; without the actual code snippet I cannot tell more
> > > > what's
> > > > > > > > > happening here. If you can look into the logs you can
> record
> > > each
> > > > > > time
> > > > > > > > when
> > > > > > > > > partition migrates, how many records from the changelog was
> > > > > replayed
> > > > > > to
> > > > > > > > > restore the store, and from which offset on the input topic
> > > does
> > > > > > > Streams
> > > > > > > > > resume processing. You may also consider upgrading to 2.6.x
> > or
> > > > > higher
> > > > > > > > > version and see if this issue goes away.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
> > > mangatmodi@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey,
> > > > > > > > > >
> > > > > > > > > > We have the following setup in our infrastructure.
> > > > > > > > > >
> > > > > > > > > >    1. Kafka - 2.5.1
> > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka` version
> > 2.5.1
> > > > > > library
> > > > > > > > > >    3. Low level processor API is used with *atleast-once*
> > > > > semantics
> > > > > > > > > >    4. State stores are *in-memory* with *caching
> disabled*
> > > and
> > > > > > > > *changelog
> > > > > > > > > >    enabled*
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Is it possible that during state replication and
> partition
> > > > > > > > reassignment,
> > > > > > > > > > the input data is not always applied to the state store?
> > > > > > > > > >
> > > > > > > > > > 1. Let's say the input topic is having records like
> > following
> > > > > > > > > >
> > > > > > > > > > ```
> > > > > > > > > > Key1, V1
> > > > > > > > > > Key1, null (tombstone)
> > > > > > > > > > Key1, V2
> > > > > > > > > > Key1, null
> > > > > > > > > > Key1, V3
> > > > > > > > > > Key1, V4
> > > > > > > > > > ```
> > > > > > > > > > 2. The app has an aggregation function which takes these
> > > record
> > > > > and
> > > > > > > > > update
> > > > > > > > > > the state store so that changelog shall be
> > > > > > > > > >
> > > > > > > > > > ```
> > > > > > > > > > Key1, V1
> > > > > > > > > > Key1, null (tombstone)
> > > > > > > > > > Key1, V2
> > > > > > > > > > Key1, null
> > > > > > > > > > Key1, V3
> > > > > > > > > > Key1, V3 + V4
> > > > > > > > > > ```
> > > > > > > > > > Let's say the partition responsible for processing the
> > above
> > > > key
> > > > > > was
> > > > > > > > > > several times reallocated to different threads due to
> some
> > > > infra
> > > > > > > issues
> > > > > > > > > we
> > > > > > > > > > are having(in Kubernetes where we run the app, not the
> > Kafka
> > > > > > > cluster).
> > > > > > > > > >
> > > > > > > > > > I see the following record in the changelogs
> > > > > > > > > >
> > > > > > > > > > ```
> > > > > > > > > > Key1, V1
> > > > > > > > > > Key1, null
> > > > > > > > > > Key1, V1
> > > > > > > > > > Key1, null  (processed again)
> > > > > > > > > > Key1, V2
> > > > > > > > > > Key1, null
> > > > > > > > > > Key1, V1
> > > > > > > > > > Key1,V2
> > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet
> but
> > > > > > > reprocessed
> > > > > > > > > V1
> > > > > > > > > > again due to reassignment)
> > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1
> > > > > tombstone
> > > > > > > > > should
> > > > > > > > > > have been applied also!!)
> > > > > > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > > > > > Key1,V1
> > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > > > > > ```
> > > > > > > > > >
> > > > > > > > > > If you see this means several things
> > > > > > > > > > 1. The state is always correctly applied locally (in
> > > developer
> > > > > > > laptop),
> > > > > > > > > > where there were no reassignments.
> > > > > > > > > > 2. The records are processed multiple times, which is
> > > > > > understandable
> > > > > > > as
> > > > > > > > > we
> > > > > > > > > > have at least symantics here.
> > > > > > > > > > 3. As long as we re-apply the same events in the same
> > orders
> > > we
> > > > > are
> > > > > > > > > golden
> > > > > > > > > > but looks like some records are skipped, but here it
> looks
> > as
> > > > if
> > > > > we
> > > > > > > > have
> > > > > > > > > > multiple consumers reading and update the same topics,
> > > leading
> > > > to
> > > > > > > race
> > > > > > > > > > conditions.
> > > > > > > > > >
> > > > > > > > > > Is there any way, Kafka streams' state replication could
> > lead
> > > > to
> > > > > > > such a
> > > > > > > > > > race condition?
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Mangat
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Stream: State replication seems unpredictable.

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Mangat,

What you've encountered is a "zombie writer" issue, that is, Thread-2 did
not know there's already a new rebalance and hence its partitions have been
migrated out, until it tries to commit and then got notified of the
illegal-generation error and realize itself is the "zombie" already. This
case would still persist even with incremental rebalancing.

I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to summarize
the situation. Please LMK if that explanation is clear to you.

On Mon, Apr 19, 2021 at 12:58 AM mangat rai <ma...@gmail.com> wrote:

> Thanks, Guozhang,
>
> I was knocking myself with Kafka's various consumer rebalancing algorithms
> in the last 2 days. Could I generalize this problem as
>
>
>
> *Any in-memory state store backed by a changelog topic will always risk
> having interleaved writes from two different writers during rebalancing?*
> In our case, CPU throttling made it worse as thread-2 didn't try to commit
> for a long time. Also,
>
> 1. Do you think if we disable the incremental rebalancing, we will not have
> this issue because If I understood correctly Thread-4 will not start
> processing until the state is completely transferred from Thread-2.
> 2. If yes, how can we disable it without downgrading the client?
>
> Since we have a very low scale and no real-time computing requirement, we
> will be happy to sacrifice the availability to have consistency.
>
> Regards,
> Mangat
>
>
>
> On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Mangat:
> >
> > I think I found the issue of your problem here.
> >
> > It seems thread-2's partition was assigned to thread-4 while thread-2 was
> > not aware (because it missed a rebalance, this is normal scenario); in
> > other words, thread2 becomes a "zombie". It would stay in that zombie
> state
> > until it tried to commit, in which it would get an error from the brokers
> > and realize its zombie identity and re-joins the group.
> >
> > During that period of time, before the commit was issued, it would
> continue
> > trying to write to its local states; here are several scenarios:
> >
> > 1) if thread-2/4 are belonging to two different nodes then that is fine,
> > since they will write to different local state stores.
> > 2) if they belong to the same nodes, and
> >    a) the state stores are persistent then they would have risks of
> > contention; this is guarded by the state directory locks (as file locks)
> in
> > which case the new owner thread-4 should not be able to get on the local
> > state files.
> >    b) the state stores are in-memory, in which case that is fine since
> the
> > in-memory stores are kept separate as well.
> >
> > In your case: 2.b), the issue is that the changelog would still be shared
> > between the two --- but note that this is the same case as in case 1) as
> > well. And this means at that time the changelog is shared by two writers
> > sending records interleaving. And if there’s a tombstone that was
> intended
> > for a record A, but when it was written interleaving and there’s another
> > record B in between, that tombstone would effectively delete record B.
> The
> > key here is that, when we replay the changelogs, we replay it completely
> > following offset ordering.
> >
> >
> >
> > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <ma...@gmail.com> wrote:
> >
> > > Guozhang,
> > >
> > > Yes, you are correct. We have our own group processor. I have more
> > > information now.
> > >
> > > 1. I added ThreadId in the data when the app persists into the
> changelog
> > > topic.
> > > 2. Thread-2 which was working with partition-0 had a timeout issue.
> > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
> > > changelog.
> > > 5. *But then Thread-2 and Thread-4 both were writing into the
> partition-0
> > > of the changelog, that too for the same key.*
> > >
> > > So I was clearly able to see that two threads were overwriting data of
> > one
> > > another into the state store leading to a corrupted state. This
> confirms
> > my
> > > theory that it was an issue of concurrent update. This was something
> > > totally unexpected. I suspect that Thread-2 continues to persist its
> > > in-memory state, maybe because It wasn't stopped after the timeout
> > > exception. Is there a configuration possible in the Kafka stream which
> > > could lead to this?
> > >
> > > There was no network issue, our CPU was highly throttled by Kubernetes.
> > We
> > > gave more resources, also decreased the fetch-size so we have more I/O
> to
> > > Cpu time ratio than before, and then there was no timeout issue, hence
> no
> > > reassignment and hence no corrupted state.
> > >
> > > I really appreciate your help here...
> > > Thanks!
> > > Mangat
> > >
> > >
> > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hey Mangat,
> > > >
> > > > A. With at least once, Streams does not make sure atomicity of 1) /
> 2);
> > > > with exactly once, atomicity is indeed guaranteed with transactional
> > > > messaging.
> > > >
> > > > B. If you are using processor API, then I'm assuming you did your own
> > > > group-by processor right? In that case, the partition key would just
> be
> > > the
> > > > record key when you are sending to the repartition topic.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks again, that makes things clear. I still have some questions
> > here
> > > > > then.
> > > > >
> > > > > A.  For each record we read, we do two updates
> > > > >       1. Changelog topic of the state store.
> > > > >       2. Output topic aka sink.
> > > > >       Does the Kafka stream app make sure that either both are
> > > committed
> > > > or
> > > > > neither?
> > > > >
> > > > > B.  Out Input topic actually has the as (a,b,c), but we partition
> > with
> > > > only
> > > > > (a). We do this because we have different compaction requirements
> > than
> > > > the
> > > > > partitions. It will still work as all (a,b,c) records will go to
> the
> > > same
> > > > > partition. Now in aggregation, we group by (a,b,c). In such case
> what
> > > > will
> > > > > be the partition key for the changelog topic?
> > > > >
> > > > > Note that we use low-level processor API and don't commit
> ourselves.
> > > > >
> > > > > Regards,
> > > > > Mangat
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Mangat,
> > > > > >
> > > > > > Please see my replies inline below.
> > > > > >
> > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <ma...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > @Guozhang Wang
> > > > > > >
> > > > > > > Thanks for the reply.  Indeed I am finding it difficult to
> > explain
> > > > this
> > > > > > > state. I checked the code many times. There can be a bug but I
> > fail
> > > > to
> > > > > > see
> > > > > > > it. There are several things about the Kafka streams that I
> don't
> > > > > > > understand, which makes it harder for me to reason.
> > > > > > >
> > > > > > > 1. What is the partition key for the changelog topics? Is it
> the
> > > same
> > > > > as
> > > > > > > the Input key or the state store key? Or maybe the thread
> > specifies
> > > > the
> > > > > > > partition as it knows the input partition it is subscribed to?
> If
> > > the
> > > > > > input
> > > > > > > topic and state store are differently partitioned then we can
> > > explain
> > > > > the
> > > > > > > issue here.
> > > > > > >
> > > > > >
> > > > > > In Kafka Stream's changelog, the "partition key" of Kafka
> messages
> > is
> > > > the
> > > > > > same as the "message key" itself. And the message key is the same
> > as
> > > > the
> > > > > > state store key.
> > > > > >
> > > > > > Since the state store here should be storing the running
> aggregate,
> > > it
> > > > > > means that the partition key is the same as the aggregated key.
> > > > > >
> > > > > > If you are doing a group-by aggregation here, where the group-by
> > keys
> > > > are
> > > > > > different from the source input topic's keys, hence the state
> store
> > > > keys
> > > > > > would be different with the input topic keys.
> > > > > >
> > > > > >
> > > > > > > 2. Is there a background thread to persist in the state store
> > when
> > > > > > caching
> > > > > > > is disabled? When will the app commit the log for the input
> > topic?
> > > Is
> > > > > it
> > > > > > > when sink writes into the output topic or when the state store
> > > writes
> > > > > > into
> > > > > > > the changelog topic? Because, if the app commits the record
> > before
> > > > the
> > > > > > data
> > > > > > > was written to changelog topic then we can again explain this
> > state
> > > > > > >
> > > > > > > The commit happens *after* the local state store, as well as
> the
> > > > > > changelog records sent by the Streams' producers, have been
> > flushed.
> > > > I.e.
> > > > > > if there's a failure in between, you would re-process some source
> > > > records
> > > > > > and hence cause duplicates, but no data loss (a.k.a. the
> > > at_least_once
> > > > > > semantics).
> > > > > >
> > > > > >
> > > > > >
> > > > > > > >You may also consider upgrading to 2.6.x or higher version and
> > see
> > > > if
> > > > > > this
> > > > > > > issue goes away.
> > > > > > > Do you mean the client or the Kafka broker? I will be upgrading
> > the
> > > > > > client
> > > > > > > to 2.7.0 soon.
> > > > > > >
> > > > > > > I meant the client.
> > > > > >
> > > > > >
> > > > > > > Sadly looking into the timestamp will not help much as we use
> > some
> > > > > > business
> > > > > > > time field to set the record timestamp. If I am right, there is
> > no
> > > > way
> > > > > > now
> > > > > > > to know that when a Producer wrote a record in a Kafka topic.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Mangat
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hello Mangat,
> > > > > > > >
> > > > > > > > With at least once, although some records maybe processed
> > > multiple
> > > > > > times
> > > > > > > > their process ordering should not be violated, so what you
> > > observed
> > > > > is
> > > > > > > not
> > > > > > > > expected. What caught my eyes are this section in your output
> > > > > > changelogs
> > > > > > > > (high-lighted):
> > > > > > > >
> > > > > > > > Key1, V1
> > > > > > > > Key1, null
> > > > > > > > Key1, V1
> > > > > > > > Key1, null  (processed again)
> > > > > > > > Key1, V2
> > > > > > > > Key1, null
> > > > > > > >
> > > > > > > > *Key1, V1Key1,V2*
> > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > > reprocessed
> > > > > > > V1
> > > > > > > > again due to reassignment)
> > > > > > > >
> > > > > > > > They seem to be the result of first receiving a tombstone
> which
> > > > > removes
> > > > > > > V1
> > > > > > > > and then a new record that adds V2. However, since caching is
> > > > > disabled
> > > > > > > you
> > > > > > > > should get
> > > > > > > >
> > > > > > > > *Key1,V1*
> > > > > > > > *Key1,null*
> > > > > > > > *Key1,V2*
> > > > > > > >
> > > > > > > > instead; without the actual code snippet I cannot tell more
> > > what's
> > > > > > > > happening here. If you can look into the logs you can record
> > each
> > > > > time
> > > > > > > when
> > > > > > > > partition migrates, how many records from the changelog was
> > > > replayed
> > > > > to
> > > > > > > > restore the store, and from which offset on the input topic
> > does
> > > > > > Streams
> > > > > > > > resume processing. You may also consider upgrading to 2.6.x
> or
> > > > higher
> > > > > > > > version and see if this issue goes away.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
> > mangatmodi@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey,
> > > > > > > > >
> > > > > > > > > We have the following setup in our infrastructure.
> > > > > > > > >
> > > > > > > > >    1. Kafka - 2.5.1
> > > > > > > > >    2. Apps use kafka streams `org.apache.kafka` version
> 2.5.1
> > > > > library
> > > > > > > > >    3. Low level processor API is used with *atleast-once*
> > > > semantics
> > > > > > > > >    4. State stores are *in-memory* with *caching disabled*
> > and
> > > > > > > *changelog
> > > > > > > > >    enabled*
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Is it possible that during state replication and partition
> > > > > > > reassignment,
> > > > > > > > > the input data is not always applied to the state store?
> > > > > > > > >
> > > > > > > > > 1. Let's say the input topic is having records like
> following
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null (tombstone)
> > > > > > > > > Key1, V2
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V3
> > > > > > > > > Key1, V4
> > > > > > > > > ```
> > > > > > > > > 2. The app has an aggregation function which takes these
> > record
> > > > and
> > > > > > > > update
> > > > > > > > > the state store so that changelog shall be
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null (tombstone)
> > > > > > > > > Key1, V2
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V3
> > > > > > > > > Key1, V3 + V4
> > > > > > > > > ```
> > > > > > > > > Let's say the partition responsible for processing the
> above
> > > key
> > > > > was
> > > > > > > > > several times reallocated to different threads due to some
> > > infra
> > > > > > issues
> > > > > > > > we
> > > > > > > > > are having(in Kubernetes where we run the app, not the
> Kafka
> > > > > > cluster).
> > > > > > > > >
> > > > > > > > > I see the following record in the changelogs
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V1
> > > > > > > > > Key1, null  (processed again)
> > > > > > > > > Key1, V2
> > > > > > > > > Key1, null
> > > > > > > > > Key1, V1
> > > > > > > > > Key1,V2
> > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > > > reprocessed
> > > > > > > > V1
> > > > > > > > > again due to reassignment)
> > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1
> > > > tombstone
> > > > > > > > should
> > > > > > > > > have been applied also!!)
> > > > > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > > > > Key1,V1
> > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > If you see this means several things
> > > > > > > > > 1. The state is always correctly applied locally (in
> > developer
> > > > > > laptop),
> > > > > > > > > where there were no reassignments.
> > > > > > > > > 2. The records are processed multiple times, which is
> > > > > understandable
> > > > > > as
> > > > > > > > we
> > > > > > > > > have at least symantics here.
> > > > > > > > > 3. As long as we re-apply the same events in the same
> orders
> > we
> > > > are
> > > > > > > > golden
> > > > > > > > > but looks like some records are skipped, but here it looks
> as
> > > if
> > > > we
> > > > > > > have
> > > > > > > > > multiple consumers reading and update the same topics,
> > leading
> > > to
> > > > > > race
> > > > > > > > > conditions.
> > > > > > > > >
> > > > > > > > > Is there any way, Kafka streams' state replication could
> lead
> > > to
> > > > > > such a
> > > > > > > > > race condition?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Mangat
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Kafka Stream: State replication seems unpredictable.

Posted by mangat rai <ma...@gmail.com>.
Thanks, Guozhang,

I was knocking myself with Kafka's various consumer rebalancing algorithms
in the last 2 days. Could I generalize this problem as



*Any in-memory state store backed by a changelog topic will always risk
having interleaved writes from two different writers during rebalancing?*
In our case, CPU throttling made it worse as thread-2 didn't try to commit
for a long time. Also,

1. Do you think if we disable the incremental rebalancing, we will not have
this issue because If I understood correctly Thread-4 will not start
processing until the state is completely transferred from Thread-2.
2. If yes, how can we disable it without downgrading the client?

Since we have a very low scale and no real-time computing requirement, we
will be happy to sacrifice the availability to have consistency.

Regards,
Mangat



On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Mangat:
>
> I think I found the issue of your problem here.
>
> It seems thread-2's partition was assigned to thread-4 while thread-2 was
> not aware (because it missed a rebalance, this is normal scenario); in
> other words, thread2 becomes a "zombie". It would stay in that zombie state
> until it tried to commit, in which it would get an error from the brokers
> and realize its zombie identity and re-joins the group.
>
> During that period of time, before the commit was issued, it would continue
> trying to write to its local states; here are several scenarios:
>
> 1) if thread-2/4 are belonging to two different nodes then that is fine,
> since they will write to different local state stores.
> 2) if they belong to the same nodes, and
>    a) the state stores are persistent then they would have risks of
> contention; this is guarded by the state directory locks (as file locks) in
> which case the new owner thread-4 should not be able to get on the local
> state files.
>    b) the state stores are in-memory, in which case that is fine since the
> in-memory stores are kept separate as well.
>
> In your case: 2.b), the issue is that the changelog would still be shared
> between the two --- but note that this is the same case as in case 1) as
> well. And this means at that time the changelog is shared by two writers
> sending records interleaving. And if there’s a tombstone that was intended
> for a record A, but when it was written interleaving and there’s another
> record B in between, that tombstone would effectively delete record B. The
> key here is that, when we replay the changelogs, we replay it completely
> following offset ordering.
>
>
>
> On Thu, Apr 15, 2021 at 2:28 AM mangat rai <ma...@gmail.com> wrote:
>
> > Guozhang,
> >
> > Yes, you are correct. We have our own group processor. I have more
> > information now.
> >
> > 1. I added ThreadId in the data when the app persists into the changelog
> > topic.
> > 2. Thread-2 which was working with partition-0 had a timeout issue.
> > 4. Thread-4 picked up this partition-0 as I can see its Id in the
> > changelog.
> > 5. *But then Thread-2 and Thread-4 both were writing into the partition-0
> > of the changelog, that too for the same key.*
> >
> > So I was clearly able to see that two threads were overwriting data of
> one
> > another into the state store leading to a corrupted state. This confirms
> my
> > theory that it was an issue of concurrent update. This was something
> > totally unexpected. I suspect that Thread-2 continues to persist its
> > in-memory state, maybe because It wasn't stopped after the timeout
> > exception. Is there a configuration possible in the Kafka stream which
> > could lead to this?
> >
> > There was no network issue, our CPU was highly throttled by Kubernetes.
> We
> > gave more resources, also decreased the fetch-size so we have more I/O to
> > Cpu time ratio than before, and then there was no timeout issue, hence no
> > reassignment and hence no corrupted state.
> >
> > I really appreciate your help here...
> > Thanks!
> > Mangat
> >
> >
> > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hey Mangat,
> > >
> > > A. With at least once, Streams does not make sure atomicity of 1) / 2);
> > > with exactly once, atomicity is indeed guaranteed with transactional
> > > messaging.
> > >
> > > B. If you are using processor API, then I'm assuming you did your own
> > > group-by processor right? In that case, the partition key would just be
> > the
> > > record key when you are sending to the repartition topic.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com>
> wrote:
> > >
> > > > Thanks again, that makes things clear. I still have some questions
> here
> > > > then.
> > > >
> > > > A.  For each record we read, we do two updates
> > > >       1. Changelog topic of the state store.
> > > >       2. Output topic aka sink.
> > > >       Does the Kafka stream app make sure that either both are
> > committed
> > > or
> > > > neither?
> > > >
> > > > B.  Out Input topic actually has the as (a,b,c), but we partition
> with
> > > only
> > > > (a). We do this because we have different compaction requirements
> than
> > > the
> > > > partitions. It will still work as all (a,b,c) records will go to the
> > same
> > > > partition. Now in aggregation, we group by (a,b,c). In such case what
> > > will
> > > > be the partition key for the changelog topic?
> > > >
> > > > Note that we use low-level processor API and don't commit ourselves.
> > > >
> > > > Regards,
> > > > Mangat
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Mangat,
> > > > >
> > > > > Please see my replies inline below.
> > > > >
> > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <ma...@gmail.com>
> > > wrote:
> > > > >
> > > > > > @Guozhang Wang
> > > > > >
> > > > > > Thanks for the reply.  Indeed I am finding it difficult to
> explain
> > > this
> > > > > > state. I checked the code many times. There can be a bug but I
> fail
> > > to
> > > > > see
> > > > > > it. There are several things about the Kafka streams that I don't
> > > > > > understand, which makes it harder for me to reason.
> > > > > >
> > > > > > 1. What is the partition key for the changelog topics? Is it the
> > same
> > > > as
> > > > > > the Input key or the state store key? Or maybe the thread
> specifies
> > > the
> > > > > > partition as it knows the input partition it is subscribed to? If
> > the
> > > > > input
> > > > > > topic and state store are differently partitioned then we can
> > explain
> > > > the
> > > > > > issue here.
> > > > > >
> > > > >
> > > > > In Kafka Stream's changelog, the "partition key" of Kafka messages
> is
> > > the
> > > > > same as the "message key" itself. And the message key is the same
> as
> > > the
> > > > > state store key.
> > > > >
> > > > > Since the state store here should be storing the running aggregate,
> > it
> > > > > means that the partition key is the same as the aggregated key.
> > > > >
> > > > > If you are doing a group-by aggregation here, where the group-by
> keys
> > > are
> > > > > different from the source input topic's keys, hence the state store
> > > keys
> > > > > would be different with the input topic keys.
> > > > >
> > > > >
> > > > > > 2. Is there a background thread to persist in the state store
> when
> > > > > caching
> > > > > > is disabled? When will the app commit the log for the input
> topic?
> > Is
> > > > it
> > > > > > when sink writes into the output topic or when the state store
> > writes
> > > > > into
> > > > > > the changelog topic? Because, if the app commits the record
> before
> > > the
> > > > > data
> > > > > > was written to changelog topic then we can again explain this
> state
> > > > > >
> > > > > > The commit happens *after* the local state store, as well as the
> > > > > changelog records sent by the Streams' producers, have been
> flushed.
> > > I.e.
> > > > > if there's a failure in between, you would re-process some source
> > > records
> > > > > and hence cause duplicates, but no data loss (a.k.a. the
> > at_least_once
> > > > > semantics).
> > > > >
> > > > >
> > > > >
> > > > > > >You may also consider upgrading to 2.6.x or higher version and
> see
> > > if
> > > > > this
> > > > > > issue goes away.
> > > > > > Do you mean the client or the Kafka broker? I will be upgrading
> the
> > > > > client
> > > > > > to 2.7.0 soon.
> > > > > >
> > > > > > I meant the client.
> > > > >
> > > > >
> > > > > > Sadly looking into the timestamp will not help much as we use
> some
> > > > > business
> > > > > > time field to set the record timestamp. If I am right, there is
> no
> > > way
> > > > > now
> > > > > > to know that when a Producer wrote a record in a Kafka topic.
> > > > > >
> > > > > > Regards,
> > > > > > Mangat
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hello Mangat,
> > > > > > >
> > > > > > > With at least once, although some records maybe processed
> > multiple
> > > > > times
> > > > > > > their process ordering should not be violated, so what you
> > observed
> > > > is
> > > > > > not
> > > > > > > expected. What caught my eyes are this section in your output
> > > > > changelogs
> > > > > > > (high-lighted):
> > > > > > >
> > > > > > > Key1, V1
> > > > > > > Key1, null
> > > > > > > Key1, V1
> > > > > > > Key1, null  (processed again)
> > > > > > > Key1, V2
> > > > > > > Key1, null
> > > > > > >
> > > > > > > *Key1, V1Key1,V2*
> > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > reprocessed
> > > > > > V1
> > > > > > > again due to reassignment)
> > > > > > >
> > > > > > > They seem to be the result of first receiving a tombstone which
> > > > removes
> > > > > > V1
> > > > > > > and then a new record that adds V2. However, since caching is
> > > > disabled
> > > > > > you
> > > > > > > should get
> > > > > > >
> > > > > > > *Key1,V1*
> > > > > > > *Key1,null*
> > > > > > > *Key1,V2*
> > > > > > >
> > > > > > > instead; without the actual code snippet I cannot tell more
> > what's
> > > > > > > happening here. If you can look into the logs you can record
> each
> > > > time
> > > > > > when
> > > > > > > partition migrates, how many records from the changelog was
> > > replayed
> > > > to
> > > > > > > restore the store, and from which offset on the input topic
> does
> > > > > Streams
> > > > > > > resume processing. You may also consider upgrading to 2.6.x or
> > > higher
> > > > > > > version and see if this issue goes away.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
> mangatmodi@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hey,
> > > > > > > >
> > > > > > > > We have the following setup in our infrastructure.
> > > > > > > >
> > > > > > > >    1. Kafka - 2.5.1
> > > > > > > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1
> > > > library
> > > > > > > >    3. Low level processor API is used with *atleast-once*
> > > semantics
> > > > > > > >    4. State stores are *in-memory* with *caching disabled*
> and
> > > > > > *changelog
> > > > > > > >    enabled*
> > > > > > > >
> > > > > > > >
> > > > > > > > Is it possible that during state replication and partition
> > > > > > reassignment,
> > > > > > > > the input data is not always applied to the state store?
> > > > > > > >
> > > > > > > > 1. Let's say the input topic is having records like following
> > > > > > > >
> > > > > > > > ```
> > > > > > > > Key1, V1
> > > > > > > > Key1, null (tombstone)
> > > > > > > > Key1, V2
> > > > > > > > Key1, null
> > > > > > > > Key1, V3
> > > > > > > > Key1, V4
> > > > > > > > ```
> > > > > > > > 2. The app has an aggregation function which takes these
> record
> > > and
> > > > > > > update
> > > > > > > > the state store so that changelog shall be
> > > > > > > >
> > > > > > > > ```
> > > > > > > > Key1, V1
> > > > > > > > Key1, null (tombstone)
> > > > > > > > Key1, V2
> > > > > > > > Key1, null
> > > > > > > > Key1, V3
> > > > > > > > Key1, V3 + V4
> > > > > > > > ```
> > > > > > > > Let's say the partition responsible for processing the above
> > key
> > > > was
> > > > > > > > several times reallocated to different threads due to some
> > infra
> > > > > issues
> > > > > > > we
> > > > > > > > are having(in Kubernetes where we run the app, not the Kafka
> > > > > cluster).
> > > > > > > >
> > > > > > > > I see the following record in the changelogs
> > > > > > > >
> > > > > > > > ```
> > > > > > > > Key1, V1
> > > > > > > > Key1, null
> > > > > > > > Key1, V1
> > > > > > > > Key1, null  (processed again)
> > > > > > > > Key1, V2
> > > > > > > > Key1, null
> > > > > > > > Key1, V1
> > > > > > > > Key1,V2
> > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > > reprocessed
> > > > > > > V1
> > > > > > > > again due to reassignment)
> > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1
> > > tombstone
> > > > > > > should
> > > > > > > > have been applied also!!)
> > > > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > > > Key1,V1
> > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > > > ```
> > > > > > > >
> > > > > > > > If you see this means several things
> > > > > > > > 1. The state is always correctly applied locally (in
> developer
> > > > > laptop),
> > > > > > > > where there were no reassignments.
> > > > > > > > 2. The records are processed multiple times, which is
> > > > understandable
> > > > > as
> > > > > > > we
> > > > > > > > have at least symantics here.
> > > > > > > > 3. As long as we re-apply the same events in the same orders
> we
> > > are
> > > > > > > golden
> > > > > > > > but looks like some records are skipped, but here it looks as
> > if
> > > we
> > > > > > have
> > > > > > > > multiple consumers reading and update the same topics,
> leading
> > to
> > > > > race
> > > > > > > > conditions.
> > > > > > > >
> > > > > > > > Is there any way, Kafka streams' state replication could lead
> > to
> > > > > such a
> > > > > > > > race condition?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Mangat
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Stream: State replication seems unpredictable.

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Mangat:

I think I found the issue of your problem here.

It seems thread-2's partition was assigned to thread-4 while thread-2 was
not aware (because it missed a rebalance, this is normal scenario); in
other words, thread2 becomes a "zombie". It would stay in that zombie state
until it tried to commit, in which it would get an error from the brokers
and realize its zombie identity and re-joins the group.

During that period of time, before the commit was issued, it would continue
trying to write to its local states; here are several scenarios:

1) if thread-2/4 are belonging to two different nodes then that is fine,
since they will write to different local state stores.
2) if they belong to the same nodes, and
   a) the state stores are persistent then they would have risks of
contention; this is guarded by the state directory locks (as file locks) in
which case the new owner thread-4 should not be able to get on the local
state files.
   b) the state stores are in-memory, in which case that is fine since the
in-memory stores are kept separate as well.

In your case: 2.b), the issue is that the changelog would still be shared
between the two --- but note that this is the same case as in case 1) as
well. And this means at that time the changelog is shared by two writers
sending records interleaving. And if there’s a tombstone that was intended
for a record A, but when it was written interleaving and there’s another
record B in between, that tombstone would effectively delete record B. The
key here is that, when we replay the changelogs, we replay it completely
following offset ordering.



On Thu, Apr 15, 2021 at 2:28 AM mangat rai <ma...@gmail.com> wrote:

> Guozhang,
>
> Yes, you are correct. We have our own group processor. I have more
> information now.
>
> 1. I added ThreadId in the data when the app persists into the changelog
> topic.
> 2. Thread-2 which was working with partition-0 had a timeout issue.
> 4. Thread-4 picked up this partition-0 as I can see its Id in the
> changelog.
> 5. *But then Thread-2 and Thread-4 both were writing into the partition-0
> of the changelog, that too for the same key.*
>
> So I was clearly able to see that two threads were overwriting data of one
> another into the state store leading to a corrupted state. This confirms my
> theory that it was an issue of concurrent update. This was something
> totally unexpected. I suspect that Thread-2 continues to persist its
> in-memory state, maybe because It wasn't stopped after the timeout
> exception. Is there a configuration possible in the Kafka stream which
> could lead to this?
>
> There was no network issue, our CPU was highly throttled by Kubernetes. We
> gave more resources, also decreased the fetch-size so we have more I/O to
> Cpu time ratio than before, and then there was no timeout issue, hence no
> reassignment and hence no corrupted state.
>
> I really appreciate your help here...
> Thanks!
> Mangat
>
>
> On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hey Mangat,
> >
> > A. With at least once, Streams does not make sure atomicity of 1) / 2);
> > with exactly once, atomicity is indeed guaranteed with transactional
> > messaging.
> >
> > B. If you are using processor API, then I'm assuming you did your own
> > group-by processor right? In that case, the partition key would just be
> the
> > record key when you are sending to the repartition topic.
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com> wrote:
> >
> > > Thanks again, that makes things clear. I still have some questions here
> > > then.
> > >
> > > A.  For each record we read, we do two updates
> > >       1. Changelog topic of the state store.
> > >       2. Output topic aka sink.
> > >       Does the Kafka stream app make sure that either both are
> committed
> > or
> > > neither?
> > >
> > > B.  Out Input topic actually has the as (a,b,c), but we partition with
> > only
> > > (a). We do this because we have different compaction requirements than
> > the
> > > partitions. It will still work as all (a,b,c) records will go to the
> same
> > > partition. Now in aggregation, we group by (a,b,c). In such case what
> > will
> > > be the partition key for the changelog topic?
> > >
> > > Note that we use low-level processor API and don't commit ourselves.
> > >
> > > Regards,
> > > Mangat
> > >
> > >
> > >
> > >
> > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > > Hi Mangat,
> > > >
> > > > Please see my replies inline below.
> > > >
> > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <ma...@gmail.com>
> > wrote:
> > > >
> > > > > @Guozhang Wang
> > > > >
> > > > > Thanks for the reply.  Indeed I am finding it difficult to explain
> > this
> > > > > state. I checked the code many times. There can be a bug but I fail
> > to
> > > > see
> > > > > it. There are several things about the Kafka streams that I don't
> > > > > understand, which makes it harder for me to reason.
> > > > >
> > > > > 1. What is the partition key for the changelog topics? Is it the
> same
> > > as
> > > > > the Input key or the state store key? Or maybe the thread specifies
> > the
> > > > > partition as it knows the input partition it is subscribed to? If
> the
> > > > input
> > > > > topic and state store are differently partitioned then we can
> explain
> > > the
> > > > > issue here.
> > > > >
> > > >
> > > > In Kafka Stream's changelog, the "partition key" of Kafka messages is
> > the
> > > > same as the "message key" itself. And the message key is the same as
> > the
> > > > state store key.
> > > >
> > > > Since the state store here should be storing the running aggregate,
> it
> > > > means that the partition key is the same as the aggregated key.
> > > >
> > > > If you are doing a group-by aggregation here, where the group-by keys
> > are
> > > > different from the source input topic's keys, hence the state store
> > keys
> > > > would be different with the input topic keys.
> > > >
> > > >
> > > > > 2. Is there a background thread to persist in the state store when
> > > > caching
> > > > > is disabled? When will the app commit the log for the input topic?
> Is
> > > it
> > > > > when sink writes into the output topic or when the state store
> writes
> > > > into
> > > > > the changelog topic? Because, if the app commits the record before
> > the
> > > > data
> > > > > was written to changelog topic then we can again explain this state
> > > > >
> > > > > The commit happens *after* the local state store, as well as the
> > > > changelog records sent by the Streams' producers, have been flushed.
> > I.e.
> > > > if there's a failure in between, you would re-process some source
> > records
> > > > and hence cause duplicates, but no data loss (a.k.a. the
> at_least_once
> > > > semantics).
> > > >
> > > >
> > > >
> > > > > >You may also consider upgrading to 2.6.x or higher version and see
> > if
> > > > this
> > > > > issue goes away.
> > > > > Do you mean the client or the Kafka broker? I will be upgrading the
> > > > client
> > > > > to 2.7.0 soon.
> > > > >
> > > > > I meant the client.
> > > >
> > > >
> > > > > Sadly looking into the timestamp will not help much as we use some
> > > > business
> > > > > time field to set the record timestamp. If I am right, there is no
> > way
> > > > now
> > > > > to know that when a Producer wrote a record in a Kafka topic.
> > > > >
> > > > > Regards,
> > > > > Mangat
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hello Mangat,
> > > > > >
> > > > > > With at least once, although some records maybe processed
> multiple
> > > > times
> > > > > > their process ordering should not be violated, so what you
> observed
> > > is
> > > > > not
> > > > > > expected. What caught my eyes are this section in your output
> > > > changelogs
> > > > > > (high-lighted):
> > > > > >
> > > > > > Key1, V1
> > > > > > Key1, null
> > > > > > Key1, V1
> > > > > > Key1, null  (processed again)
> > > > > > Key1, V2
> > > > > > Key1, null
> > > > > >
> > > > > > *Key1, V1Key1,V2*
> > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > reprocessed
> > > > > V1
> > > > > > again due to reassignment)
> > > > > >
> > > > > > They seem to be the result of first receiving a tombstone which
> > > removes
> > > > > V1
> > > > > > and then a new record that adds V2. However, since caching is
> > > disabled
> > > > > you
> > > > > > should get
> > > > > >
> > > > > > *Key1,V1*
> > > > > > *Key1,null*
> > > > > > *Key1,V2*
> > > > > >
> > > > > > instead; without the actual code snippet I cannot tell more
> what's
> > > > > > happening here. If you can look into the logs you can record each
> > > time
> > > > > when
> > > > > > partition migrates, how many records from the changelog was
> > replayed
> > > to
> > > > > > restore the store, and from which offset on the input topic does
> > > > Streams
> > > > > > resume processing. You may also consider upgrading to 2.6.x or
> > higher
> > > > > > version and see if this issue goes away.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <ma...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hey,
> > > > > > >
> > > > > > > We have the following setup in our infrastructure.
> > > > > > >
> > > > > > >    1. Kafka - 2.5.1
> > > > > > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1
> > > library
> > > > > > >    3. Low level processor API is used with *atleast-once*
> > semantics
> > > > > > >    4. State stores are *in-memory* with *caching disabled* and
> > > > > *changelog
> > > > > > >    enabled*
> > > > > > >
> > > > > > >
> > > > > > > Is it possible that during state replication and partition
> > > > > reassignment,
> > > > > > > the input data is not always applied to the state store?
> > > > > > >
> > > > > > > 1. Let's say the input topic is having records like following
> > > > > > >
> > > > > > > ```
> > > > > > > Key1, V1
> > > > > > > Key1, null (tombstone)
> > > > > > > Key1, V2
> > > > > > > Key1, null
> > > > > > > Key1, V3
> > > > > > > Key1, V4
> > > > > > > ```
> > > > > > > 2. The app has an aggregation function which takes these record
> > and
> > > > > > update
> > > > > > > the state store so that changelog shall be
> > > > > > >
> > > > > > > ```
> > > > > > > Key1, V1
> > > > > > > Key1, null (tombstone)
> > > > > > > Key1, V2
> > > > > > > Key1, null
> > > > > > > Key1, V3
> > > > > > > Key1, V3 + V4
> > > > > > > ```
> > > > > > > Let's say the partition responsible for processing the above
> key
> > > was
> > > > > > > several times reallocated to different threads due to some
> infra
> > > > issues
> > > > > > we
> > > > > > > are having(in Kubernetes where we run the app, not the Kafka
> > > > cluster).
> > > > > > >
> > > > > > > I see the following record in the changelogs
> > > > > > >
> > > > > > > ```
> > > > > > > Key1, V1
> > > > > > > Key1, null
> > > > > > > Key1, V1
> > > > > > > Key1, null  (processed again)
> > > > > > > Key1, V2
> > > > > > > Key1, null
> > > > > > > Key1, V1
> > > > > > > Key1,V2
> > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > > reprocessed
> > > > > > V1
> > > > > > > again due to reassignment)
> > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1
> > tombstone
> > > > > > should
> > > > > > > have been applied also!!)
> > > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > > Key1,V1
> > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > > ```
> > > > > > >
> > > > > > > If you see this means several things
> > > > > > > 1. The state is always correctly applied locally (in developer
> > > > laptop),
> > > > > > > where there were no reassignments.
> > > > > > > 2. The records are processed multiple times, which is
> > > understandable
> > > > as
> > > > > > we
> > > > > > > have at least symantics here.
> > > > > > > 3. As long as we re-apply the same events in the same orders we
> > are
> > > > > > golden
> > > > > > > but looks like some records are skipped, but here it looks as
> if
> > we
> > > > > have
> > > > > > > multiple consumers reading and update the same topics, leading
> to
> > > > race
> > > > > > > conditions.
> > > > > > >
> > > > > > > Is there any way, Kafka streams' state replication could lead
> to
> > > > such a
> > > > > > > race condition?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Mangat
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Kafka Stream: State replication seems unpredictable.

Posted by mangat rai <ma...@gmail.com>.
Guozhang,

Yes, you are correct. We have our own group processor. I have more
information now.

1. I added ThreadId in the data when the app persists into the changelog
topic.
2. Thread-2 which was working with partition-0 had a timeout issue.
4. Thread-4 picked up this partition-0 as I can see its Id in the changelog.
5. *But then Thread-2 and Thread-4 both were writing into the partition-0
of the changelog, that too for the same key.*

So I was clearly able to see that two threads were overwriting data of one
another into the state store leading to a corrupted state. This confirms my
theory that it was an issue of concurrent update. This was something
totally unexpected. I suspect that Thread-2 continues to persist its
in-memory state, maybe because It wasn't stopped after the timeout
exception. Is there a configuration possible in the Kafka stream which
could lead to this?

There was no network issue, our CPU was highly throttled by Kubernetes. We
gave more resources, also decreased the fetch-size so we have more I/O to
Cpu time ratio than before, and then there was no timeout issue, hence no
reassignment and hence no corrupted state.

I really appreciate your help here...
Thanks!
Mangat


On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hey Mangat,
>
> A. With at least once, Streams does not make sure atomicity of 1) / 2);
> with exactly once, atomicity is indeed guaranteed with transactional
> messaging.
>
> B. If you are using processor API, then I'm assuming you did your own
> group-by processor right? In that case, the partition key would just be the
> record key when you are sending to the repartition topic.
>
>
> Guozhang
>
>
>
>
> On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com> wrote:
>
> > Thanks again, that makes things clear. I still have some questions here
> > then.
> >
> > A.  For each record we read, we do two updates
> >       1. Changelog topic of the state store.
> >       2. Output topic aka sink.
> >       Does the Kafka stream app make sure that either both are committed
> or
> > neither?
> >
> > B.  Out Input topic actually has the as (a,b,c), but we partition with
> only
> > (a). We do this because we have different compaction requirements than
> the
> > partitions. It will still work as all (a,b,c) records will go to the same
> > partition. Now in aggregation, we group by (a,b,c). In such case what
> will
> > be the partition key for the changelog topic?
> >
> > Note that we use low-level processor API and don't commit ourselves.
> >
> > Regards,
> > Mangat
> >
> >
> >
> >
> > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Hi Mangat,
> > >
> > > Please see my replies inline below.
> > >
> > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <ma...@gmail.com>
> wrote:
> > >
> > > > @Guozhang Wang
> > > >
> > > > Thanks for the reply.  Indeed I am finding it difficult to explain
> this
> > > > state. I checked the code many times. There can be a bug but I fail
> to
> > > see
> > > > it. There are several things about the Kafka streams that I don't
> > > > understand, which makes it harder for me to reason.
> > > >
> > > > 1. What is the partition key for the changelog topics? Is it the same
> > as
> > > > the Input key or the state store key? Or maybe the thread specifies
> the
> > > > partition as it knows the input partition it is subscribed to? If the
> > > input
> > > > topic and state store are differently partitioned then we can explain
> > the
> > > > issue here.
> > > >
> > >
> > > In Kafka Stream's changelog, the "partition key" of Kafka messages is
> the
> > > same as the "message key" itself. And the message key is the same as
> the
> > > state store key.
> > >
> > > Since the state store here should be storing the running aggregate, it
> > > means that the partition key is the same as the aggregated key.
> > >
> > > If you are doing a group-by aggregation here, where the group-by keys
> are
> > > different from the source input topic's keys, hence the state store
> keys
> > > would be different with the input topic keys.
> > >
> > >
> > > > 2. Is there a background thread to persist in the state store when
> > > caching
> > > > is disabled? When will the app commit the log for the input topic? Is
> > it
> > > > when sink writes into the output topic or when the state store writes
> > > into
> > > > the changelog topic? Because, if the app commits the record before
> the
> > > data
> > > > was written to changelog topic then we can again explain this state
> > > >
> > > > The commit happens *after* the local state store, as well as the
> > > changelog records sent by the Streams' producers, have been flushed.
> I.e.
> > > if there's a failure in between, you would re-process some source
> records
> > > and hence cause duplicates, but no data loss (a.k.a. the at_least_once
> > > semantics).
> > >
> > >
> > >
> > > > >You may also consider upgrading to 2.6.x or higher version and see
> if
> > > this
> > > > issue goes away.
> > > > Do you mean the client or the Kafka broker? I will be upgrading the
> > > client
> > > > to 2.7.0 soon.
> > > >
> > > > I meant the client.
> > >
> > >
> > > > Sadly looking into the timestamp will not help much as we use some
> > > business
> > > > time field to set the record timestamp. If I am right, there is no
> way
> > > now
> > > > to know that when a Producer wrote a record in a Kafka topic.
> > > >
> > > > Regards,
> > > > Mangat
> > > >
> > > >
> > > >
> > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >
> > > > > Hello Mangat,
> > > > >
> > > > > With at least once, although some records maybe processed multiple
> > > times
> > > > > their process ordering should not be violated, so what you observed
> > is
> > > > not
> > > > > expected. What caught my eyes are this section in your output
> > > changelogs
> > > > > (high-lighted):
> > > > >
> > > > > Key1, V1
> > > > > Key1, null
> > > > > Key1, V1
> > > > > Key1, null  (processed again)
> > > > > Key1, V2
> > > > > Key1, null
> > > > >
> > > > > *Key1, V1Key1,V2*
> > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > reprocessed
> > > > V1
> > > > > again due to reassignment)
> > > > >
> > > > > They seem to be the result of first receiving a tombstone which
> > removes
> > > > V1
> > > > > and then a new record that adds V2. However, since caching is
> > disabled
> > > > you
> > > > > should get
> > > > >
> > > > > *Key1,V1*
> > > > > *Key1,null*
> > > > > *Key1,V2*
> > > > >
> > > > > instead; without the actual code snippet I cannot tell more what's
> > > > > happening here. If you can look into the logs you can record each
> > time
> > > > when
> > > > > partition migrates, how many records from the changelog was
> replayed
> > to
> > > > > restore the store, and from which offset on the input topic does
> > > Streams
> > > > > resume processing. You may also consider upgrading to 2.6.x or
> higher
> > > > > version and see if this issue goes away.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <ma...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey,
> > > > > >
> > > > > > We have the following setup in our infrastructure.
> > > > > >
> > > > > >    1. Kafka - 2.5.1
> > > > > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1
> > library
> > > > > >    3. Low level processor API is used with *atleast-once*
> semantics
> > > > > >    4. State stores are *in-memory* with *caching disabled* and
> > > > *changelog
> > > > > >    enabled*
> > > > > >
> > > > > >
> > > > > > Is it possible that during state replication and partition
> > > > reassignment,
> > > > > > the input data is not always applied to the state store?
> > > > > >
> > > > > > 1. Let's say the input topic is having records like following
> > > > > >
> > > > > > ```
> > > > > > Key1, V1
> > > > > > Key1, null (tombstone)
> > > > > > Key1, V2
> > > > > > Key1, null
> > > > > > Key1, V3
> > > > > > Key1, V4
> > > > > > ```
> > > > > > 2. The app has an aggregation function which takes these record
> and
> > > > > update
> > > > > > the state store so that changelog shall be
> > > > > >
> > > > > > ```
> > > > > > Key1, V1
> > > > > > Key1, null (tombstone)
> > > > > > Key1, V2
> > > > > > Key1, null
> > > > > > Key1, V3
> > > > > > Key1, V3 + V4
> > > > > > ```
> > > > > > Let's say the partition responsible for processing the above key
> > was
> > > > > > several times reallocated to different threads due to some infra
> > > issues
> > > > > we
> > > > > > are having(in Kubernetes where we run the app, not the Kafka
> > > cluster).
> > > > > >
> > > > > > I see the following record in the changelogs
> > > > > >
> > > > > > ```
> > > > > > Key1, V1
> > > > > > Key1, null
> > > > > > Key1, V1
> > > > > > Key1, null  (processed again)
> > > > > > Key1, V2
> > > > > > Key1, null
> > > > > > Key1, V1
> > > > > > Key1,V2
> > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > reprocessed
> > > > > V1
> > > > > > again due to reassignment)
> > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1
> tombstone
> > > > > should
> > > > > > have been applied also!!)
> > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > Key1,V1
> > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > ```
> > > > > >
> > > > > > If you see this means several things
> > > > > > 1. The state is always correctly applied locally (in developer
> > > laptop),
> > > > > > where there were no reassignments.
> > > > > > 2. The records are processed multiple times, which is
> > understandable
> > > as
> > > > > we
> > > > > > have at least symantics here.
> > > > > > 3. As long as we re-apply the same events in the same orders we
> are
> > > > > golden
> > > > > > but looks like some records are skipped, but here it looks as if
> we
> > > > have
> > > > > > multiple consumers reading and update the same topics, leading to
> > > race
> > > > > > conditions.
> > > > > >
> > > > > > Is there any way, Kafka streams' state replication could lead to
> > > such a
> > > > > > race condition?
> > > > > >
> > > > > > Regards,
> > > > > > Mangat
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Stream: State replication seems unpredictable.

Posted by Guozhang Wang <wa...@gmail.com>.
Hey Mangat,

A. With at least once, Streams does not make sure atomicity of 1) / 2);
with exactly once, atomicity is indeed guaranteed with transactional
messaging.

B. If you are using processor API, then I'm assuming you did your own
group-by processor right? In that case, the partition key would just be the
record key when you are sending to the repartition topic.


Guozhang




On Thu, Apr 8, 2021 at 9:00 AM mangat rai <ma...@gmail.com> wrote:

> Thanks again, that makes things clear. I still have some questions here
> then.
>
> A.  For each record we read, we do two updates
>       1. Changelog topic of the state store.
>       2. Output topic aka sink.
>       Does the Kafka stream app make sure that either both are committed or
> neither?
>
> B.  Out Input topic actually has the as (a,b,c), but we partition with only
> (a). We do this because we have different compaction requirements than the
> partitions. It will still work as all (a,b,c) records will go to the same
> partition. Now in aggregation, we group by (a,b,c). In such case what will
> be the partition key for the changelog topic?
>
> Note that we use low-level processor API and don't commit ourselves.
>
> Regards,
> Mangat
>
>
>
>
> On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Mangat,
> >
> > Please see my replies inline below.
> >
> > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <ma...@gmail.com> wrote:
> >
> > > @Guozhang Wang
> > >
> > > Thanks for the reply.  Indeed I am finding it difficult to explain this
> > > state. I checked the code many times. There can be a bug but I fail to
> > see
> > > it. There are several things about the Kafka streams that I don't
> > > understand, which makes it harder for me to reason.
> > >
> > > 1. What is the partition key for the changelog topics? Is it the same
> as
> > > the Input key or the state store key? Or maybe the thread specifies the
> > > partition as it knows the input partition it is subscribed to? If the
> > input
> > > topic and state store are differently partitioned then we can explain
> the
> > > issue here.
> > >
> >
> > In Kafka Stream's changelog, the "partition key" of Kafka messages is the
> > same as the "message key" itself. And the message key is the same as the
> > state store key.
> >
> > Since the state store here should be storing the running aggregate, it
> > means that the partition key is the same as the aggregated key.
> >
> > If you are doing a group-by aggregation here, where the group-by keys are
> > different from the source input topic's keys, hence the state store keys
> > would be different with the input topic keys.
> >
> >
> > > 2. Is there a background thread to persist in the state store when
> > caching
> > > is disabled? When will the app commit the log for the input topic? Is
> it
> > > when sink writes into the output topic or when the state store writes
> > into
> > > the changelog topic? Because, if the app commits the record before the
> > data
> > > was written to changelog topic then we can again explain this state
> > >
> > > The commit happens *after* the local state store, as well as the
> > changelog records sent by the Streams' producers, have been flushed. I.e.
> > if there's a failure in between, you would re-process some source records
> > and hence cause duplicates, but no data loss (a.k.a. the at_least_once
> > semantics).
> >
> >
> >
> > > >You may also consider upgrading to 2.6.x or higher version and see if
> > this
> > > issue goes away.
> > > Do you mean the client or the Kafka broker? I will be upgrading the
> > client
> > > to 2.7.0 soon.
> > >
> > > I meant the client.
> >
> >
> > > Sadly looking into the timestamp will not help much as we use some
> > business
> > > time field to set the record timestamp. If I am right, there is no way
> > now
> > > to know that when a Producer wrote a record in a Kafka topic.
> > >
> > > Regards,
> > > Mangat
> > >
> > >
> > >
> > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > > Hello Mangat,
> > > >
> > > > With at least once, although some records maybe processed multiple
> > times
> > > > their process ordering should not be violated, so what you observed
> is
> > > not
> > > > expected. What caught my eyes are this section in your output
> > changelogs
> > > > (high-lighted):
> > > >
> > > > Key1, V1
> > > > Key1, null
> > > > Key1, V1
> > > > Key1, null  (processed again)
> > > > Key1, V2
> > > > Key1, null
> > > >
> > > > *Key1, V1Key1,V2*
> > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> reprocessed
> > > V1
> > > > again due to reassignment)
> > > >
> > > > They seem to be the result of first receiving a tombstone which
> removes
> > > V1
> > > > and then a new record that adds V2. However, since caching is
> disabled
> > > you
> > > > should get
> > > >
> > > > *Key1,V1*
> > > > *Key1,null*
> > > > *Key1,V2*
> > > >
> > > > instead; without the actual code snippet I cannot tell more what's
> > > > happening here. If you can look into the logs you can record each
> time
> > > when
> > > > partition migrates, how many records from the changelog was replayed
> to
> > > > restore the store, and from which offset on the input topic does
> > Streams
> > > > resume processing. You may also consider upgrading to 2.6.x or higher
> > > > version and see if this issue goes away.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <ma...@gmail.com>
> > wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > We have the following setup in our infrastructure.
> > > > >
> > > > >    1. Kafka - 2.5.1
> > > > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1
> library
> > > > >    3. Low level processor API is used with *atleast-once* semantics
> > > > >    4. State stores are *in-memory* with *caching disabled* and
> > > *changelog
> > > > >    enabled*
> > > > >
> > > > >
> > > > > Is it possible that during state replication and partition
> > > reassignment,
> > > > > the input data is not always applied to the state store?
> > > > >
> > > > > 1. Let's say the input topic is having records like following
> > > > >
> > > > > ```
> > > > > Key1, V1
> > > > > Key1, null (tombstone)
> > > > > Key1, V2
> > > > > Key1, null
> > > > > Key1, V3
> > > > > Key1, V4
> > > > > ```
> > > > > 2. The app has an aggregation function which takes these record and
> > > > update
> > > > > the state store so that changelog shall be
> > > > >
> > > > > ```
> > > > > Key1, V1
> > > > > Key1, null (tombstone)
> > > > > Key1, V2
> > > > > Key1, null
> > > > > Key1, V3
> > > > > Key1, V3 + V4
> > > > > ```
> > > > > Let's say the partition responsible for processing the above key
> was
> > > > > several times reallocated to different threads due to some infra
> > issues
> > > > we
> > > > > are having(in Kubernetes where we run the app, not the Kafka
> > cluster).
> > > > >
> > > > > I see the following record in the changelogs
> > > > >
> > > > > ```
> > > > > Key1, V1
> > > > > Key1, null
> > > > > Key1, V1
> > > > > Key1, null  (processed again)
> > > > > Key1, V2
> > > > > Key1, null
> > > > > Key1, V1
> > > > > Key1,V2
> > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > reprocessed
> > > > V1
> > > > > again due to reassignment)
> > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone
> > > > should
> > > > > have been applied also!!)
> > > > > Key1, V2+V1 (it is back!!!)
> > > > > Key1,V1
> > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > ```
> > > > >
> > > > > If you see this means several things
> > > > > 1. The state is always correctly applied locally (in developer
> > laptop),
> > > > > where there were no reassignments.
> > > > > 2. The records are processed multiple times, which is
> understandable
> > as
> > > > we
> > > > > have at least symantics here.
> > > > > 3. As long as we re-apply the same events in the same orders we are
> > > > golden
> > > > > but looks like some records are skipped, but here it looks as if we
> > > have
> > > > > multiple consumers reading and update the same topics, leading to
> > race
> > > > > conditions.
> > > > >
> > > > > Is there any way, Kafka streams' state replication could lead to
> > such a
> > > > > race condition?
> > > > >
> > > > > Regards,
> > > > > Mangat
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Kafka Stream: State replication seems unpredictable.

Posted by mangat rai <ma...@gmail.com>.
Thanks again, that makes things clear. I still have some questions here
then.

A.  For each record we read, we do two updates
      1. Changelog topic of the state store.
      2. Output topic aka sink.
      Does the Kafka stream app make sure that either both are committed or
neither?

B.  Out Input topic actually has the as (a,b,c), but we partition with only
(a). We do this because we have different compaction requirements than the
partitions. It will still work as all (a,b,c) records will go to the same
partition. Now in aggregation, we group by (a,b,c). In such case what will
be the partition key for the changelog topic?

Note that we use low-level processor API and don't commit ourselves.

Regards,
Mangat




On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Mangat,
>
> Please see my replies inline below.
>
> On Thu, Apr 8, 2021 at 5:34 AM mangat rai <ma...@gmail.com> wrote:
>
> > @Guozhang Wang
> >
> > Thanks for the reply.  Indeed I am finding it difficult to explain this
> > state. I checked the code many times. There can be a bug but I fail to
> see
> > it. There are several things about the Kafka streams that I don't
> > understand, which makes it harder for me to reason.
> >
> > 1. What is the partition key for the changelog topics? Is it the same as
> > the Input key or the state store key? Or maybe the thread specifies the
> > partition as it knows the input partition it is subscribed to? If the
> input
> > topic and state store are differently partitioned then we can explain the
> > issue here.
> >
>
> In Kafka Stream's changelog, the "partition key" of Kafka messages is the
> same as the "message key" itself. And the message key is the same as the
> state store key.
>
> Since the state store here should be storing the running aggregate, it
> means that the partition key is the same as the aggregated key.
>
> If you are doing a group-by aggregation here, where the group-by keys are
> different from the source input topic's keys, hence the state store keys
> would be different with the input topic keys.
>
>
> > 2. Is there a background thread to persist in the state store when
> caching
> > is disabled? When will the app commit the log for the input topic? Is it
> > when sink writes into the output topic or when the state store writes
> into
> > the changelog topic? Because, if the app commits the record before the
> data
> > was written to changelog topic then we can again explain this state
> >
> > The commit happens *after* the local state store, as well as the
> changelog records sent by the Streams' producers, have been flushed. I.e.
> if there's a failure in between, you would re-process some source records
> and hence cause duplicates, but no data loss (a.k.a. the at_least_once
> semantics).
>
>
>
> > >You may also consider upgrading to 2.6.x or higher version and see if
> this
> > issue goes away.
> > Do you mean the client or the Kafka broker? I will be upgrading the
> client
> > to 2.7.0 soon.
> >
> > I meant the client.
>
>
> > Sadly looking into the timestamp will not help much as we use some
> business
> > time field to set the record timestamp. If I am right, there is no way
> now
> > to know that when a Producer wrote a record in a Kafka topic.
> >
> > Regards,
> > Mangat
> >
> >
> >
> > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Hello Mangat,
> > >
> > > With at least once, although some records maybe processed multiple
> times
> > > their process ordering should not be violated, so what you observed is
> > not
> > > expected. What caught my eyes are this section in your output
> changelogs
> > > (high-lighted):
> > >
> > > Key1, V1
> > > Key1, null
> > > Key1, V1
> > > Key1, null  (processed again)
> > > Key1, V2
> > > Key1, null
> > >
> > > *Key1, V1Key1,V2*
> > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed
> > V1
> > > again due to reassignment)
> > >
> > > They seem to be the result of first receiving a tombstone which removes
> > V1
> > > and then a new record that adds V2. However, since caching is disabled
> > you
> > > should get
> > >
> > > *Key1,V1*
> > > *Key1,null*
> > > *Key1,V2*
> > >
> > > instead; without the actual code snippet I cannot tell more what's
> > > happening here. If you can look into the logs you can record each time
> > when
> > > partition migrates, how many records from the changelog was replayed to
> > > restore the store, and from which offset on the input topic does
> Streams
> > > resume processing. You may also consider upgrading to 2.6.x or higher
> > > version and see if this issue goes away.
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <ma...@gmail.com>
> wrote:
> > >
> > > > Hey,
> > > >
> > > > We have the following setup in our infrastructure.
> > > >
> > > >    1. Kafka - 2.5.1
> > > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
> > > >    3. Low level processor API is used with *atleast-once* semantics
> > > >    4. State stores are *in-memory* with *caching disabled* and
> > *changelog
> > > >    enabled*
> > > >
> > > >
> > > > Is it possible that during state replication and partition
> > reassignment,
> > > > the input data is not always applied to the state store?
> > > >
> > > > 1. Let's say the input topic is having records like following
> > > >
> > > > ```
> > > > Key1, V1
> > > > Key1, null (tombstone)
> > > > Key1, V2
> > > > Key1, null
> > > > Key1, V3
> > > > Key1, V4
> > > > ```
> > > > 2. The app has an aggregation function which takes these record and
> > > update
> > > > the state store so that changelog shall be
> > > >
> > > > ```
> > > > Key1, V1
> > > > Key1, null (tombstone)
> > > > Key1, V2
> > > > Key1, null
> > > > Key1, V3
> > > > Key1, V3 + V4
> > > > ```
> > > > Let's say the partition responsible for processing the above key was
> > > > several times reallocated to different threads due to some infra
> issues
> > > we
> > > > are having(in Kubernetes where we run the app, not the Kafka
> cluster).
> > > >
> > > > I see the following record in the changelogs
> > > >
> > > > ```
> > > > Key1, V1
> > > > Key1, null
> > > > Key1, V1
> > > > Key1, null  (processed again)
> > > > Key1, V2
> > > > Key1, null
> > > > Key1, V1
> > > > Key1,V2
> > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> reprocessed
> > > V1
> > > > again due to reassignment)
> > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone
> > > should
> > > > have been applied also!!)
> > > > Key1, V2+V1 (it is back!!!)
> > > > Key1,V1
> > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > ```
> > > >
> > > > If you see this means several things
> > > > 1. The state is always correctly applied locally (in developer
> laptop),
> > > > where there were no reassignments.
> > > > 2. The records are processed multiple times, which is understandable
> as
> > > we
> > > > have at least symantics here.
> > > > 3. As long as we re-apply the same events in the same orders we are
> > > golden
> > > > but looks like some records are skipped, but here it looks as if we
> > have
> > > > multiple consumers reading and update the same topics, leading to
> race
> > > > conditions.
> > > >
> > > > Is there any way, Kafka streams' state replication could lead to
> such a
> > > > race condition?
> > > >
> > > > Regards,
> > > > Mangat
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Stream: State replication seems unpredictable.

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Mangat,

Please see my replies inline below.

On Thu, Apr 8, 2021 at 5:34 AM mangat rai <ma...@gmail.com> wrote:

> @Guozhang Wang
>
> Thanks for the reply.  Indeed I am finding it difficult to explain this
> state. I checked the code many times. There can be a bug but I fail to see
> it. There are several things about the Kafka streams that I don't
> understand, which makes it harder for me to reason.
>
> 1. What is the partition key for the changelog topics? Is it the same as
> the Input key or the state store key? Or maybe the thread specifies the
> partition as it knows the input partition it is subscribed to? If the input
> topic and state store are differently partitioned then we can explain the
> issue here.
>

In Kafka Stream's changelog, the "partition key" of Kafka messages is the
same as the "message key" itself. And the message key is the same as the
state store key.

Since the state store here should be storing the running aggregate, it
means that the partition key is the same as the aggregated key.

If you are doing a group-by aggregation here, where the group-by keys are
different from the source input topic's keys, hence the state store keys
would be different with the input topic keys.


> 2. Is there a background thread to persist in the state store when caching
> is disabled? When will the app commit the log for the input topic? Is it
> when sink writes into the output topic or when the state store writes into
> the changelog topic? Because, if the app commits the record before the data
> was written to changelog topic then we can again explain this state
>
> The commit happens *after* the local state store, as well as the
changelog records sent by the Streams' producers, have been flushed. I.e.
if there's a failure in between, you would re-process some source records
and hence cause duplicates, but no data loss (a.k.a. the at_least_once
semantics).



> >You may also consider upgrading to 2.6.x or higher version and see if this
> issue goes away.
> Do you mean the client or the Kafka broker? I will be upgrading the client
> to 2.7.0 soon.
>
> I meant the client.


> Sadly looking into the timestamp will not help much as we use some business
> time field to set the record timestamp. If I am right, there is no way now
> to know that when a Producer wrote a record in a Kafka topic.
>
> Regards,
> Mangat
>
>
>
> On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Mangat,
> >
> > With at least once, although some records maybe processed multiple times
> > their process ordering should not be violated, so what you observed is
> not
> > expected. What caught my eyes are this section in your output changelogs
> > (high-lighted):
> >
> > Key1, V1
> > Key1, null
> > Key1, V1
> > Key1, null  (processed again)
> > Key1, V2
> > Key1, null
> >
> > *Key1, V1Key1,V2*
> > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed
> V1
> > again due to reassignment)
> >
> > They seem to be the result of first receiving a tombstone which removes
> V1
> > and then a new record that adds V2. However, since caching is disabled
> you
> > should get
> >
> > *Key1,V1*
> > *Key1,null*
> > *Key1,V2*
> >
> > instead; without the actual code snippet I cannot tell more what's
> > happening here. If you can look into the logs you can record each time
> when
> > partition migrates, how many records from the changelog was replayed to
> > restore the store, and from which offset on the input topic does Streams
> > resume processing. You may also consider upgrading to 2.6.x or higher
> > version and see if this issue goes away.
> >
> >
> > Guozhang
> >
> > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <ma...@gmail.com> wrote:
> >
> > > Hey,
> > >
> > > We have the following setup in our infrastructure.
> > >
> > >    1. Kafka - 2.5.1
> > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
> > >    3. Low level processor API is used with *atleast-once* semantics
> > >    4. State stores are *in-memory* with *caching disabled* and
> *changelog
> > >    enabled*
> > >
> > >
> > > Is it possible that during state replication and partition
> reassignment,
> > > the input data is not always applied to the state store?
> > >
> > > 1. Let's say the input topic is having records like following
> > >
> > > ```
> > > Key1, V1
> > > Key1, null (tombstone)
> > > Key1, V2
> > > Key1, null
> > > Key1, V3
> > > Key1, V4
> > > ```
> > > 2. The app has an aggregation function which takes these record and
> > update
> > > the state store so that changelog shall be
> > >
> > > ```
> > > Key1, V1
> > > Key1, null (tombstone)
> > > Key1, V2
> > > Key1, null
> > > Key1, V3
> > > Key1, V3 + V4
> > > ```
> > > Let's say the partition responsible for processing the above key was
> > > several times reallocated to different threads due to some infra issues
> > we
> > > are having(in Kubernetes where we run the app, not the Kafka cluster).
> > >
> > > I see the following record in the changelogs
> > >
> > > ```
> > > Key1, V1
> > > Key1, null
> > > Key1, V1
> > > Key1, null  (processed again)
> > > Key1, V2
> > > Key1, null
> > > Key1, V1
> > > Key1,V2
> > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed
> > V1
> > > again due to reassignment)
> > > Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone
> > should
> > > have been applied also!!)
> > > Key1, V2+V1 (it is back!!!)
> > > Key1,V1
> > > Key1, V1 + V2 + V3 (This is the final state)!
> > > ```
> > >
> > > If you see this means several things
> > > 1. The state is always correctly applied locally (in developer laptop),
> > > where there were no reassignments.
> > > 2. The records are processed multiple times, which is understandable as
> > we
> > > have at least symantics here.
> > > 3. As long as we re-apply the same events in the same orders we are
> > golden
> > > but looks like some records are skipped, but here it looks as if we
> have
> > > multiple consumers reading and update the same topics, leading to race
> > > conditions.
> > >
> > > Is there any way, Kafka streams' state replication could lead to such a
> > > race condition?
> > >
> > > Regards,
> > > Mangat
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Kafka Stream: State replication seems unpredictable.

Posted by mangat rai <ma...@gmail.com>.
@Guozhang Wang

Thanks for the reply.  Indeed I am finding it difficult to explain this
state. I checked the code many times. There can be a bug but I fail to see
it. There are several things about the Kafka streams that I don't
understand, which makes it harder for me to reason.

1. What is the partition key for the changelog topics? Is it the same as
the Input key or the state store key? Or maybe the thread specifies the
partition as it knows the input partition it is subscribed to? If the input
topic and state store are differently partitioned then we can explain the
issue here.
2. Is there a background thread to persist in the state store when caching
is disabled? When will the app commit the log for the input topic? Is it
when sink writes into the output topic or when the state store writes into
the changelog topic? Because, if the app commits the record before the data
was written to changelog topic then we can again explain this state

>You may also consider upgrading to 2.6.x or higher version and see if this
issue goes away.
Do you mean the client or the Kafka broker? I will be upgrading the client
to 2.7.0 soon.

Sadly looking into the timestamp will not help much as we use some business
time field to set the record timestamp. If I am right, there is no way now
to know that when a Producer wrote a record in a Kafka topic.

Regards,
Mangat



On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Mangat,
>
> With at least once, although some records maybe processed multiple times
> their process ordering should not be violated, so what you observed is not
> expected. What caught my eyes are this section in your output changelogs
> (high-lighted):
>
> Key1, V1
> Key1, null
> Key1, V1
> Key1, null  (processed again)
> Key1, V2
> Key1, null
>
> *Key1, V1Key1,V2*
> Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
> again due to reassignment)
>
> They seem to be the result of first receiving a tombstone which removes V1
> and then a new record that adds V2. However, since caching is disabled you
> should get
>
> *Key1,V1*
> *Key1,null*
> *Key1,V2*
>
> instead; without the actual code snippet I cannot tell more what's
> happening here. If you can look into the logs you can record each time when
> partition migrates, how many records from the changelog was replayed to
> restore the store, and from which offset on the input topic does Streams
> resume processing. You may also consider upgrading to 2.6.x or higher
> version and see if this issue goes away.
>
>
> Guozhang
>
> On Tue, Apr 6, 2021 at 8:38 AM mangat rai <ma...@gmail.com> wrote:
>
> > Hey,
> >
> > We have the following setup in our infrastructure.
> >
> >    1. Kafka - 2.5.1
> >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
> >    3. Low level processor API is used with *atleast-once* semantics
> >    4. State stores are *in-memory* with *caching disabled* and *changelog
> >    enabled*
> >
> >
> > Is it possible that during state replication and partition reassignment,
> > the input data is not always applied to the state store?
> >
> > 1. Let's say the input topic is having records like following
> >
> > ```
> > Key1, V1
> > Key1, null (tombstone)
> > Key1, V2
> > Key1, null
> > Key1, V3
> > Key1, V4
> > ```
> > 2. The app has an aggregation function which takes these record and
> update
> > the state store so that changelog shall be
> >
> > ```
> > Key1, V1
> > Key1, null (tombstone)
> > Key1, V2
> > Key1, null
> > Key1, V3
> > Key1, V3 + V4
> > ```
> > Let's say the partition responsible for processing the above key was
> > several times reallocated to different threads due to some infra issues
> we
> > are having(in Kubernetes where we run the app, not the Kafka cluster).
> >
> > I see the following record in the changelogs
> >
> > ```
> > Key1, V1
> > Key1, null
> > Key1, V1
> > Key1, null  (processed again)
> > Key1, V2
> > Key1, null
> > Key1, V1
> > Key1,V2
> > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed
> V1
> > again due to reassignment)
> > Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone
> should
> > have been applied also!!)
> > Key1, V2+V1 (it is back!!!)
> > Key1,V1
> > Key1, V1 + V2 + V3 (This is the final state)!
> > ```
> >
> > If you see this means several things
> > 1. The state is always correctly applied locally (in developer laptop),
> > where there were no reassignments.
> > 2. The records are processed multiple times, which is understandable as
> we
> > have at least symantics here.
> > 3. As long as we re-apply the same events in the same orders we are
> golden
> > but looks like some records are skipped, but here it looks as if we have
> > multiple consumers reading and update the same topics, leading to race
> > conditions.
> >
> > Is there any way, Kafka streams' state replication could lead to such a
> > race condition?
> >
> > Regards,
> > Mangat
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Stream: State replication seems unpredictable.

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Mangat,

With at least once, although some records maybe processed multiple times
their process ordering should not be violated, so what you observed is not
expected. What caught my eyes are this section in your output changelogs
(high-lighted):

Key1, V1
Key1, null
Key1, V1
Key1, null  (processed again)
Key1, V2
Key1, null

*Key1, V1Key1,V2*
Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
again due to reassignment)

They seem to be the result of first receiving a tombstone which removes V1
and then a new record that adds V2. However, since caching is disabled you
should get

*Key1,V1*
*Key1,null*
*Key1,V2*

instead; without the actual code snippet I cannot tell more what's
happening here. If you can look into the logs you can record each time when
partition migrates, how many records from the changelog was replayed to
restore the store, and from which offset on the input topic does Streams
resume processing. You may also consider upgrading to 2.6.x or higher
version and see if this issue goes away.


Guozhang

On Tue, Apr 6, 2021 at 8:38 AM mangat rai <ma...@gmail.com> wrote:

> Hey,
>
> We have the following setup in our infrastructure.
>
>    1. Kafka - 2.5.1
>    2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
>    3. Low level processor API is used with *atleast-once* semantics
>    4. State stores are *in-memory* with *caching disabled* and *changelog
>    enabled*
>
>
> Is it possible that during state replication and partition reassignment,
> the input data is not always applied to the state store?
>
> 1. Let's say the input topic is having records like following
>
> ```
> Key1, V1
> Key1, null (tombstone)
> Key1, V2
> Key1, null
> Key1, V3
> Key1, V4
> ```
> 2. The app has an aggregation function which takes these record and update
> the state store so that changelog shall be
>
> ```
> Key1, V1
> Key1, null (tombstone)
> Key1, V2
> Key1, null
> Key1, V3
> Key1, V3 + V4
> ```
> Let's say the partition responsible for processing the above key was
> several times reallocated to different threads due to some infra issues we
> are having(in Kubernetes where we run the app, not the Kafka cluster).
>
> I see the following record in the changelogs
>
> ```
> Key1, V1
> Key1, null
> Key1, V1
> Key1, null  (processed again)
> Key1, V2
> Key1, null
> Key1, V1
> Key1,V2
> Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
> again due to reassignment)
> Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone should
> have been applied also!!)
> Key1, V2+V1 (it is back!!!)
> Key1,V1
> Key1, V1 + V2 + V3 (This is the final state)!
> ```
>
> If you see this means several things
> 1. The state is always correctly applied locally (in developer laptop),
> where there were no reassignments.
> 2. The records are processed multiple times, which is understandable as we
> have at least symantics here.
> 3. As long as we re-apply the same events in the same orders we are golden
> but looks like some records are skipped, but here it looks as if we have
> multiple consumers reading and update the same topics, leading to race
> conditions.
>
> Is there any way, Kafka streams' state replication could lead to such a
> race condition?
>
> Regards,
> Mangat
>


-- 
-- Guozhang