You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Charles Devilliers <ch...@gmail.com> on 2020/07/29 22:00:56 UTC
Sharing of State Stores
Hello,
I have some rudimentary questions on state stores. My service is planned to
have two transformers, each listening to a different topic. Both topics
have the same number of partitions and the upstream producers to those
topics are consistent with respect to key schema. My question centers
around the fact that both transformers need to consult and update the same
persistent state store in order to make decisions with respect to record
processing. I am not implementing a custom key partitioner, I'm using the
default. Also, there is no re-keying done by either transformer.
Given the above scenario, I have the following questions:
1) Will a given key always hash to the same kstream consumer group member
for both transformers? You can imagine why this is important given that
they share a state store. My concern is that rebalancing may occur, and
somehow the key space for one of the transformers is moved to another pod,
but not both.
2) If transformer A processes a record R for a given key K, and the shared
state store is updated at key K as a result of that processing, does the
second transformer B have access to the updated state store value as soon
as transformer A is done processing the record? (Assume the record is
updated with a state store put()).
I have been told that in order to ensure that the partition assignments are
consistent across pods, for both input topics, I have to do some exotic
merging of the kstreams that process the input topics, which feels strange
and wrong.
Are there any other constraints or considerations that involve sharing a
state store across transformers that I should be thinking about in my
architecture for this service, but didn't mention?
Thanks for clarifying.
Re: Sharing of State Stores
Posted by Bruno Cadonna <br...@confluent.io>.
Hi Charles,
Two transformers that share the same state store should end up into the
same sub-topology. A sub-topology is executed by as many task as the
number of partitions of the input topics. Each task processes the
records from one input partition group (i.e. the same partition from
both input topics in your case). A task is assigned to one single stream
thread on a Kafka Streams client. Each stream thread is a member of the
consumer group.
1) As far as I understand your setup, same keys are produced to one
partition. A given key will end up in the same partition in both of your
input topics. Hence, the key will be processed by the same task that
executes the sub-topology that contains both transformers.
2) Since the execution of a task is single threaded, the transformers
will access the state consecutively and see the updated state store.
Streams tries to process records from both partitions in time order, but
this is best-effort and not guaranteed (see max.task.idle.ms). The time
order depends on the timestamp extractor you use.
To check if both transformers are in the same sub-topology you can call
topology.describe().toString(). To visualize the topology you can use
https://zz85.github.io/kafka-streams-viz/.
Best,
Bruno
On 30.07.20 00:00, Charles Devilliers wrote:
> Hello,
>
> I have some rudimentary questions on state stores. My service is planned to
> have two transformers, each listening to a different topic. Both topics
> have the same number of partitions and the upstream producers to those
> topics are consistent with respect to key schema. My question centers
> around the fact that both transformers need to consult and update the same
> persistent state store in order to make decisions with respect to record
> processing. I am not implementing a custom key partitioner, I'm using the
> default. Also, there is no re-keying done by either transformer.
>
> Given the above scenario, I have the following questions:
>
> 1) Will a given key always hash to the same kstream consumer group member
> for both transformers? You can imagine why this is important given that
> they share a state store. My concern is that rebalancing may occur, and
> somehow the key space for one of the transformers is moved to another pod,
> but not both.
>
> 2) If transformer A processes a record R for a given key K, and the shared
> state store is updated at key K as a result of that processing, does the
> second transformer B have access to the updated state store value as soon
> as transformer A is done processing the record? (Assume the record is
> updated with a state store put()).
>
> I have been told that in order to ensure that the partition assignments are
> consistent across pods, for both input topics, I have to do some exotic
> merging of the kstreams that process the input topics, which feels strange
> and wrong.
>
> Are there any other constraints or considerations that involve sharing a
> state store across transformers that I should be thinking about in my
> architecture for this service, but didn't mention?
>
> Thanks for clarifying.
>