You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Cipov <pc...@twilio.com.INVALID> on 2022/08/18 14:26:58 UTC

Kafka Streams Topology State

Hello,

I am looking for  help regarding topologies and tasks in KS. Mostly where
this information is stored outside of KS app.

My case is upgrading the KS topology from v1 to v2, the topology is
different (f.e adding peek node in v2). I made a change in code, prepared a
jar and deployed.

The issue that I observed was when old and new versions of app are up and
running (by mistake). The task for peek node is missing in old nodes hence
deployment crashes.

My next steps were shutting down all nodes and deployment without old
nodes. But this did not help. New deployment crashed too and I was not able
to figure out why. The topology just stopped ingestion from the source
topic. No error in logs.

My hypothesis is that the former deployment of (KS  topology v1 and v2 at
once) leaves some sort of ephemeral state in the kafka cluster itself.

And here is my question. what kind of state except messages in repartition
and changelog topics, consumer offsets are stored by KS in kafka cluster ?

What can be the root cause from your point of view ?

Thank you
Peter

Re: Kafka Streams Topology State

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Hey Peter

Try clearing the local state -- if you have stateful tasks then by default
Streams will use rocksdb to
store records locally in directories specific to/named after that task.
This is presumably why you're
seeing errors related to "the task for peek node missing in old nodes"

You can delete the local state manually, which is generally recommended, or
by running `KafkaStreams#cleanup`
-- you just want to make sure to remove the #cleanup before you resume
running normally, in case of a restart for
whatever reason (since wiping out the local state can take a while to
restore from the changelogs)

Of course, just wiping out the local state alone will get you past any
problems related to that, but you might have
to fully reset the application (or might just have lost your current
data/state) if eg the changelog or repartition topic
names changed.

In general upgrading topologies is hard to do in a compatible way, but
there are a few tricks/features you can to
make it possible in some cases. For now we recommend naming all of your
operators to reduce the chance of
processor node names getting scrambled, and there's an experimental feature
which will allow you to break up
your topology into discrete "named topologies" which can be just a single
task, which then allows adding or removing
these named topologies without disrupting the others. It's still
technically not a public feature yet, however, so there's
no "public API" (you can still use it, it's just not public in the sense
that there's no contract for the API and it may change
across versions). Happy to expand on this more if you want but hopefully
the above is sufficient for now

cheers,
sophie

On Thu, Aug 18, 2022 at 7:28 AM Peter Cipov <pc...@twilio.com.invalid>
wrote:

> Hello,
>
> I am looking for  help regarding topologies and tasks in KS. Mostly where
> this information is stored outside of KS app.
>
> My case is upgrading the KS topology from v1 to v2, the topology is
> different (f.e adding peek node in v2). I made a change in code, prepared a
> jar and deployed.
>
> The issue that I observed was when old and new versions of app are up and
> running (by mistake). The task for peek node is missing in old nodes hence
> deployment crashes.
>
> My next steps were shutting down all nodes and deployment without old
> nodes. But this did not help. New deployment crashed too and I was not able
> to figure out why. The topology just stopped ingestion from the source
> topic. No error in logs.
>
> My hypothesis is that the former deployment of (KS  topology v1 and v2 at
> once) leaves some sort of ephemeral state in the kafka cluster itself.
>
> And here is my question. what kind of state except messages in repartition
> and changelog topics, consumer offsets are stored by KS in kafka cluster ?
>
> What can be the root cause from your point of view ?
>
> Thank you
> Peter
>