You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Charles Devilliers <ch...@gmail.com> on 2020/07/29 22:00:56 UTC

Sharing of State Stores

Hello,

I have some rudimentary questions on state stores. My service is planned to
have two transformers, each listening to a different topic. Both topics
have the same number of partitions and the upstream producers to those
topics are consistent with respect to key schema. My question centers
around the fact that both transformers need to consult and update the same
persistent state store in order to make decisions with respect to record
processing. I am not implementing a custom key partitioner, I'm using the
default. Also, there is no re-keying done by either transformer.

Given the above scenario, I have the following questions:

1) Will a given key always hash to the same kstream consumer group member
for both transformers? You can imagine why this is important given that
they share a state store. My concern is that rebalancing may occur, and
somehow the key space for one of the transformers is moved to another pod,
but not both.

2) If transformer A processes a record R for a given key K, and the shared
state store is updated at key K as a result of that processing, does the
second transformer B have access to the updated state store value as soon
as transformer A is done processing the record? (Assume the record is
updated with a state store put()).

I have been told that in order to ensure that the partition assignments are
consistent across pods, for both input topics, I have to do some exotic
merging of the kstreams that process the input topics, which feels strange
and wrong.

Are there any other constraints or considerations that involve sharing a
state store across transformers that I should be thinking about in my
architecture for this service, but didn't mention?

Thanks for clarifying.

Re: Sharing of State Stores

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Charles,

Two transformers that share the same state store should end up into the 
same sub-topology. A sub-topology is executed by as many task as the 
number of partitions of the input topics. Each task processes the 
records from one input partition group (i.e. the same partition from 
both input topics in your case). A task is assigned to one single stream 
thread on a Kafka Streams client. Each stream thread is a member of the 
consumer group.

1) As far as I understand your setup, same keys are produced to one 
partition. A given key will end up in the same partition in both of your 
input topics. Hence, the key will be processed by the same task that 
executes the sub-topology that contains both transformers.

2) Since the execution of a task is single threaded, the transformers 
will access the state consecutively and see the updated state store. 
Streams tries to process records from both partitions in time order, but 
this is best-effort and not guaranteed (see max.task.idle.ms). The time 
order depends on the timestamp extractor you use.

To check if both transformers are in the same sub-topology you can call 
topology.describe().toString(). To visualize the topology you can use 
https://zz85.github.io/kafka-streams-viz/.

Best,
Bruno

On 30.07.20 00:00, Charles Devilliers wrote:
> Hello,
> 
> I have some rudimentary questions on state stores. My service is planned to
> have two transformers, each listening to a different topic. Both topics
> have the same number of partitions and the upstream producers to those
> topics are consistent with respect to key schema. My question centers
> around the fact that both transformers need to consult and update the same
> persistent state store in order to make decisions with respect to record
> processing. I am not implementing a custom key partitioner, I'm using the
> default. Also, there is no re-keying done by either transformer.
> 
> Given the above scenario, I have the following questions:
> 
> 1) Will a given key always hash to the same kstream consumer group member
> for both transformers? You can imagine why this is important given that
> they share a state store. My concern is that rebalancing may occur, and
> somehow the key space for one of the transformers is moved to another pod,
> but not both.
> 
> 2) If transformer A processes a record R for a given key K, and the shared
> state store is updated at key K as a result of that processing, does the
> second transformer B have access to the updated state store value as soon
> as transformer A is done processing the record? (Assume the record is
> updated with a state store put()).
> 
> I have been told that in order to ensure that the partition assignments are
> consistent across pods, for both input topics, I have to do some exotic
> merging of the kstreams that process the input topics, which feels strange
> and wrong.
> 
> Are there any other constraints or considerations that involve sharing a
> state store across transformers that I should be thinking about in my
> architecture for this service, but didn't mention?
> 
> Thanks for clarifying.
>