You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Schmidt-Dumont Georg (BCI/ESW17)" <Ge...@de.bosch.com.INVALID> on 2020/05/19 08:23:23 UTC

Question regarding Kafka Streams Global State Store

Good morning,

I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.

As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values.

Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state.

The following shows the build-up of my streams app:

//setup global state store
final KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore( “global-store” );
final StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new JSONObjectSerde() );
builder.addGlobalStore( storeBuilder, “global-store-topic”,  Consumed.with( Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new );

//store processor

private KeyValueStore<String, JSONObject> stateStore;

@Override
public void init( final ProcessorContext context ) {
   stateStore = (KeyValueStore<String, JSONObject>) context.getStateStore( “global-store” );
}



@Override
public void process( final String key, final JSONObject state ) {
   log.info( "Update state store for {}: {}.", key, state );
   lastRecentStateStore.put( key, state );
}


//streams setup

final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();

final KStream<String, JSONObject> stream = builder.stream( “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )

                   .transformValues( ValueTransformer::new )



stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );

stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );

//global state store access in ValueTransformer

JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
                                   .orElse( new JSONObject() );


I have set the acknowledge property for the producers to “all”.

I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success.

How can I setup a global state which meets the requirements as describe in the scenario above?

Thank you!

Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候

Mr. Georg Schmidt-Dumont
Bosch Connected Industry – BCI/ESW17
Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | www.bosch.com<http://www.bosch.com/>
Phone +49 711 811-49893  | Georg.Schmidt-Dumont@bosch.com<ma...@bosch.com>

Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar Denner,
Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel,
Christoph Kübel, Uwe Raschke, Peter Tyroller


Re: Question regarding Kafka Streams Global State Store

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Georg,

Great that you could answer your own question and I am glad that I could help.

I was just writing you a similar answer. Yes, the global state store
will eventually reflect your write but you do not know when. That is
the main issue for your use case. A local state store will immediately
contain your previous write, because it is local to your processing.

For more information on the global state store see
https://kafka.apache.org/25/documentation/streams/developer-guide/dsl-api.html#streams_concepts_globalktable

Best,
Bruno

On Tue, May 19, 2020 at 3:04 PM Schmidt-Dumont Georg (BCI/ESW17)
<Ge...@de.bosch.com.invalid> wrote:
>
> Hi Bruno,
>
> I just had a discussion with a colleague of mine regarding this and I wanted to give you a quick contextual update. With regards to the global state, I realize that having this state consistent in a distributed system is very difficult. My expectation was that since it is a global state, Kafka takes care of the consistency and I can just access the data. I think my expectation was a bit naïve. The state will probably be eventually consistent. But this does not fit with what I am trying to do. As you said I should use a local store.
>
> With regards to the question in my previous mail with the amount of partitions. I think I have answered my own question. Ensuring that the messages have the correct and consistent keys will see to it that all the data for a specific key ends up in a single partition. It does not mean that a partition per key is required (which I first thought).
>
> Thanks again for your help!
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -----Ursprüngliche Nachricht-----
> Von: Bruno Cadonna <br...@confluent.io>
> Gesendet: Dienstag, 19. Mai 2020 11:42
> An: Users <us...@kafka.apache.org>
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> local state stores in Kafka Streams are backed by a Kafka topic by default. So, if the instance crashes the local state store is restored from the local state directory. If the local state directory is empty or does not exist the local state store is restored from the Kafka topic. Local state stores are as resilient as global state stores.
>
> As far as I understand, you only look up previous records with the same key. You do not need to have the global state available at each instance to do this. Having available all records with the same key is sufficient. If your input topic are partitioned by key then records with the same key will land on the same instance. That means, your local state store contains all records with the same key.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
> >
> > Hi Bruno,
> >
> > Thanks for your quick reply!
> >
> > I decided to use a global state store for two reasons. If the application crashes, the store is populated properly once the reason for the crash has been fixed and the app starts again, i.e. I feel that it gives me a certain resiliency. Second we will be running multiple instances of the application and using a global state store provides the state across all instances.
> >
> > I am fairly new to Kafka and Kafka Streams, I am very much open to suggestions on better ways to handle the flow I need.
> >
> > Mit freundlichen Grüßen / Best regards
> >
> > Georg Schmidt-Dumont
> > BCI/ESW17
> > Bosch Connected Industry
> >
> > Tel. +49 711 811-49893
> >
> > ► Take a look: https://bgn.bosch.com/alias/bci
> >
> >
> >
> > -----Ursprüngliche Nachricht-----
> > Von: Bruno Cadonna <br...@confluent.io>
> > Gesendet: Dienstag, 19. Mai 2020 10:52
> > An: Users <us...@kafka.apache.org>
> > Betreff: Re: Question regarding Kafka Streams Global State Store
> >
> > Hi Georg,
> >
> > From your description, I do not see why you need to use a global state instead of a local one. Are there any specific reasons for that? With a local state store you would have the previous record immediately available.
> >
> > Best,
> > Bruno
> >
> > On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
> > >
> > > Good morning,
> > >
> > > I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.
> > >
> > > As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values.
> > >
> > > Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state.
> > >
> > > The following shows the build-up of my streams app:
> > >
> > > //setup global state store
> > > final KeyValueBytesStoreSupplier storeSupplier =
> > > Stores.persistentKeyValueStore( “global-store” ); final
> > > StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder =
> > > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new
> > > JSONObjectSerde() ); builder.addGlobalStore( storeBuilder,
> > > “global-store-topic”,  Consumed.with( Serdes.String(), new
> > > JSONObjectSerde() ), StoreProcessor::new );
> > >
> > > //store processor
> > >
> > > private KeyValueStore<String, JSONObject> stateStore;
> > >
> > > @Override
> > > public void init( final ProcessorContext context ) {
> > >    stateStore = (KeyValueStore<String, JSONObject>)
> > > context.getStateStore( “global-store” ); }
> > >
> > >
> > >
> > > @Override
> > > public void process( final String key, final JSONObject state ) {
> > >    log.info( "Update state store for {}: {}.", key, state );
> > >    lastRecentStateStore.put( key, state ); }
> > >
> > >
> > > //streams setup
> > >
> > > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
> > >
> > > final KStream<String, JSONObject> stream = builder.stream(
> > > “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )
> > >
> > >                    .transformValues( ValueTransformer::new )
> > >
> > >
> > >
> > > stream.to( “global-store-topic”, Produced.valueSerde(
> > > jsonObjectSerde ) );
> > >
> > > stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
> > >
> > > //global state store access in ValueTransformer
> > >
> > > JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
> > >                                    .orElse( new JSONObject() );
> > >
> > >
> > > I have set the acknowledge property for the producers to “all”.
> > >
> > > I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success.
> > >
> > > How can I setup a global state which meets the requirements as describe in the scenario above?
> > >
> > > Thank you!
> > >
> > > Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
> > >
> > > Mr. Georg Schmidt-Dumont
> > > Bosch Connected Industry – BCI/ESW17 Robert Bosch GmbH | Postfach 10
> > > 60 50 | 70049 Stuttgart | GERMANY |
> > > www.bosch.com<http://www.bosch.com/>
> > > Phone +49 711 811-49893  |
> > > Georg.Schmidt-Dumont@bosch.com<mailto:Georg.Schmidt-Dumont@bosch.com
> > > >
> > >
> > > Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> > > Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr.
> > > Volkmar Denner, Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf
> > > Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel,
> > > Christoph Kübel, Uwe Raschke, Peter Tyroller
> > >

AW: Question regarding Kafka Streams Global State Store

Posted by "Schmidt-Dumont Georg (BCI/ESW17)" <Ge...@de.bosch.com.INVALID>.
Hi Bruno,

I just had a discussion with a colleague of mine regarding this and I wanted to give you a quick contextual update. With regards to the global state, I realize that having this state consistent in a distributed system is very difficult. My expectation was that since it is a global state, Kafka takes care of the consistency and I can just access the data. I think my expectation was a bit naïve. The state will probably be eventually consistent. But this does not fit with what I am trying to do. As you said I should use a local store.

With regards to the question in my previous mail with the amount of partitions. I think I have answered my own question. Ensuring that the messages have the correct and consistent keys will see to it that all the data for a specific key ends up in a single partition. It does not mean that a partition per key is required (which I first thought).

Thanks again for your help!

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-----Ursprüngliche Nachricht-----
Von: Bruno Cadonna <br...@confluent.io> 
Gesendet: Dienstag, 19. Mai 2020 11:42
An: Users <us...@kafka.apache.org>
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by default. So, if the instance crashes the local state store is restored from the local state directory. If the local state directory is empty or does not exist the local state store is restored from the Kafka topic. Local state stores are as resilient as global state stores.

As far as I understand, you only look up previous records with the same key. You do not need to have the global state available at each instance to do this. Having available all records with the same key is sufficient. If your input topic are partitioned by key then records with the same key will land on the same instance. That means, your local state store contains all records with the same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application crashes, the store is populated properly once the reason for the crash has been fixed and the app starts again, i.e. I feel that it gives me a certain resiliency. Second we will be running multiple instances of the application and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -----Ursprüngliche Nachricht-----
> Von: Bruno Cadonna <br...@confluent.io>
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users <us...@kafka.apache.org>
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state instead of a local one. Are there any specific reasons for that? With a local state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.
> >
> > As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier = 
> > Stores.persistentKeyValueStore( “global-store” ); final 
> > StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = 
> > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new
> > JSONObjectSerde() ); builder.addGlobalStore( storeBuilder, 
> > “global-store-topic”,  Consumed.with( Serdes.String(), new
> > JSONObjectSerde() ), StoreProcessor::new );
> >
> > //store processor
> >
> > private KeyValueStore<String, JSONObject> stateStore;
> >
> > @Override
> > public void init( final ProcessorContext context ) {
> >    stateStore = (KeyValueStore<String, JSONObject>) 
> > context.getStateStore( “global-store” ); }
> >
> >
> >
> > @Override
> > public void process( final String key, final JSONObject state ) {
> >    log.info( "Update state store for {}: {}.", key, state );
> >    lastRecentStateStore.put( key, state ); }
> >
> >
> > //streams setup
> >
> > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
> >
> > final KStream<String, JSONObject> stream = builder.stream( 
> > “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )
> >
> >                    .transformValues( ValueTransformer::new )
> >
> >
> >
> > stream.to( “global-store-topic”, Produced.valueSerde( 
> > jsonObjectSerde ) );
> >
> > stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
> >
> > //global state store access in ValueTransformer
> >
> > JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
> >                                    .orElse( new JSONObject() );
> >
> >
> > I have set the acknowledge property for the producers to “all”.
> >
> > I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success.
> >
> > How can I setup a global state which meets the requirements as describe in the scenario above?
> >
> > Thank you!
> >
> > Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
> >
> > Mr. Georg Schmidt-Dumont
> > Bosch Connected Industry – BCI/ESW17 Robert Bosch GmbH | Postfach 10 
> > 60 50 | 70049 Stuttgart | GERMANY | 
> > www.bosch.com<http://www.bosch.com/>
> > Phone +49 711 811-49893  | 
> > Georg.Schmidt-Dumont@bosch.com<mailto:Georg.Schmidt-Dumont@bosch.com
> > >
> >
> > Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> > Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. 
> > Volkmar Denner, Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf 
> > Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel, 
> > Christoph Kübel, Uwe Raschke, Peter Tyroller
> >

AW: Question regarding Kafka Streams Global State Store

Posted by "Schmidt-Dumont Georg (BCI/ESW17)" <Ge...@de.bosch.com.INVALID>.
Hi Bruno,

The data I am processing comes from machines. The key is the identifier for the machine which produced a specific message. Currently we only have a couple of these machines producing data. This number will increase quite a lot over the coming years. Is the limit on the number of partitions per topic and cluster something we should/need to consider when using the keys as partitions?

Irrespective of whether I can use local state stores instead of a global state store, I would still like to understand the problem with using a global state store. It seems to me that I can only use it if the incoming message are far enough apart, time wise, to update the state in-between the processing of individual messages. This seems to be missing the point of having the store. Could you please explain what the purpose of the global state store is and when it to use it?

Thanks!

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-----Ursprüngliche Nachricht-----
Von: Bruno Cadonna <br...@confluent.io> 
Gesendet: Dienstag, 19. Mai 2020 11:42
An: Users <us...@kafka.apache.org>
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by default. So, if the instance crashes the local state store is restored from the local state directory. If the local state directory is empty or does not exist the local state store is restored from the Kafka topic. Local state stores are as resilient as global state stores.

As far as I understand, you only look up previous records with the same key. You do not need to have the global state available at each instance to do this. Having available all records with the same key is sufficient. If your input topic are partitioned by key then records with the same key will land on the same instance. That means, your local state store contains all records with the same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application crashes, the store is populated properly once the reason for the crash has been fixed and the app starts again, i.e. I feel that it gives me a certain resiliency. Second we will be running multiple instances of the application and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -----Ursprüngliche Nachricht-----
> Von: Bruno Cadonna <br...@confluent.io>
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users <us...@kafka.apache.org>
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state instead of a local one. Are there any specific reasons for that? With a local state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.
> >
> > As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier = 
> > Stores.persistentKeyValueStore( “global-store” ); final 
> > StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = 
> > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new
> > JSONObjectSerde() ); builder.addGlobalStore( storeBuilder, 
> > “global-store-topic”,  Consumed.with( Serdes.String(), new
> > JSONObjectSerde() ), StoreProcessor::new );
> >
> > //store processor
> >
> > private KeyValueStore<String, JSONObject> stateStore;
> >
> > @Override
> > public void init( final ProcessorContext context ) {
> >    stateStore = (KeyValueStore<String, JSONObject>) 
> > context.getStateStore( “global-store” ); }
> >
> >
> >
> > @Override
> > public void process( final String key, final JSONObject state ) {
> >    log.info( "Update state store for {}: {}.", key, state );
> >    lastRecentStateStore.put( key, state ); }
> >
> >
> > //streams setup
> >
> > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
> >
> > final KStream<String, JSONObject> stream = builder.stream( 
> > “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )
> >
> >                    .transformValues( ValueTransformer::new )
> >
> >
> >
> > stream.to( “global-store-topic”, Produced.valueSerde( 
> > jsonObjectSerde ) );
> >
> > stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
> >
> > //global state store access in ValueTransformer
> >
> > JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
> >                                    .orElse( new JSONObject() );
> >
> >
> > I have set the acknowledge property for the producers to “all”.
> >
> > I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success.
> >
> > How can I setup a global state which meets the requirements as describe in the scenario above?
> >
> > Thank you!
> >
> > Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
> >
> > Mr. Georg Schmidt-Dumont
> > Bosch Connected Industry – BCI/ESW17 Robert Bosch GmbH | Postfach 10 
> > 60 50 | 70049 Stuttgart | GERMANY | 
> > www.bosch.com<http://www.bosch.com/>
> > Phone +49 711 811-49893  | 
> > Georg.Schmidt-Dumont@bosch.com<mailto:Georg.Schmidt-Dumont@bosch.com
> > >
> >
> > Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> > Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. 
> > Volkmar Denner, Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf 
> > Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel, 
> > Christoph Kübel, Uwe Raschke, Peter Tyroller
> >

Re: Question regarding Kafka Streams Global State Store

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by
default. So, if the instance crashes the local state store is restored
from the local state directory. If the local state directory is empty
or does not exist the local state store is restored from the Kafka
topic. Local state stores are as resilient as global state stores.

As far as I understand, you only look up previous records with the
same key. You do not need to have the global state available at each
instance to do this. Having available all records with the same key is
sufficient. If your input topic are partitioned by key then records
with the same key will land on the same instance. That means, your
local state store contains all records with the same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17)
<Ge...@de.bosch.com.invalid> wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application crashes, the store is populated properly once the reason for the crash has been fixed and the app starts again, i.e. I feel that it gives me a certain resiliency. Second we will be running multiple instances of the application and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -----Ursprüngliche Nachricht-----
> Von: Bruno Cadonna <br...@confluent.io>
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users <us...@kafka.apache.org>
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state instead of a local one. Are there any specific reasons for that? With a local state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.
> >
> > As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier =
> > Stores.persistentKeyValueStore( “global-store” ); final
> > StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder =
> > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new
> > JSONObjectSerde() ); builder.addGlobalStore( storeBuilder,
> > “global-store-topic”,  Consumed.with( Serdes.String(), new
> > JSONObjectSerde() ), StoreProcessor::new );
> >
> > //store processor
> >
> > private KeyValueStore<String, JSONObject> stateStore;
> >
> > @Override
> > public void init( final ProcessorContext context ) {
> >    stateStore = (KeyValueStore<String, JSONObject>)
> > context.getStateStore( “global-store” ); }
> >
> >
> >
> > @Override
> > public void process( final String key, final JSONObject state ) {
> >    log.info( "Update state store for {}: {}.", key, state );
> >    lastRecentStateStore.put( key, state ); }
> >
> >
> > //streams setup
> >
> > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
> >
> > final KStream<String, JSONObject> stream = builder.stream( “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )
> >
> >                    .transformValues( ValueTransformer::new )
> >
> >
> >
> > stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
> >
> > stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
> >
> > //global state store access in ValueTransformer
> >
> > JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
> >                                    .orElse( new JSONObject() );
> >
> >
> > I have set the acknowledge property for the producers to “all”.
> >
> > I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success.
> >
> > How can I setup a global state which meets the requirements as describe in the scenario above?
> >
> > Thank you!
> >
> > Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
> >
> > Mr. Georg Schmidt-Dumont
> > Bosch Connected Industry – BCI/ESW17
> > Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | www.bosch.com<http://www.bosch.com/>
> > Phone +49 711 811-49893  | Georg.Schmidt-Dumont@bosch.com<ma...@bosch.com>
> >
> > Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> > Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar Denner,
> > Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel,
> > Christoph Kübel, Uwe Raschke, Peter Tyroller
> >

AW: Question regarding Kafka Streams Global State Store

Posted by "Schmidt-Dumont Georg (BCI/ESW17)" <Ge...@de.bosch.com.INVALID>.
Hi Bruno,

Thanks for your quick reply!

I decided to use a global state store for two reasons. If the application crashes, the store is populated properly once the reason for the crash has been fixed and the app starts again, i.e. I feel that it gives me a certain resiliency. Second we will be running multiple instances of the application and using a global state store provides the state across all instances.

I am fairly new to Kafka and Kafka Streams, I am very much open to suggestions on better ways to handle the flow I need.

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-----Ursprüngliche Nachricht-----
Von: Bruno Cadonna <br...@confluent.io> 
Gesendet: Dienstag, 19. Mai 2020 10:52
An: Users <us...@kafka.apache.org>
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

From your description, I do not see why you need to use a global state instead of a local one. Are there any specific reasons for that? With a local state store you would have the previous record immediately available.

Best,
Bruno

On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) <Ge...@de.bosch.com.invalid> wrote:
>
> Good morning,
>
> I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.
>
> As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values.
>
> Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state.
>
> The following shows the build-up of my streams app:
>
> //setup global state store
> final KeyValueBytesStoreSupplier storeSupplier = 
> Stores.persistentKeyValueStore( “global-store” ); final 
> StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = 
> Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
> JSONObjectSerde() ); builder.addGlobalStore( storeBuilder, 
> “global-store-topic”,  Consumed.with( Serdes.String(), new 
> JSONObjectSerde() ), StoreProcessor::new );
>
> //store processor
>
> private KeyValueStore<String, JSONObject> stateStore;
>
> @Override
> public void init( final ProcessorContext context ) {
>    stateStore = (KeyValueStore<String, JSONObject>) 
> context.getStateStore( “global-store” ); }
>
>
>
> @Override
> public void process( final String key, final JSONObject state ) {
>    log.info( "Update state store for {}: {}.", key, state );
>    lastRecentStateStore.put( key, state ); }
>
>
> //streams setup
>
> final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
>
> final KStream<String, JSONObject> stream = builder.stream( “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )
>
>                    .transformValues( ValueTransformer::new )
>
>
>
> stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> //global state store access in ValueTransformer
>
> JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
>                                    .orElse( new JSONObject() );
>
>
> I have set the acknowledge property for the producers to “all”.
>
> I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success.
>
> How can I setup a global state which meets the requirements as describe in the scenario above?
>
> Thank you!
>
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
>
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | www.bosch.com<http://www.bosch.com/>
> Phone +49 711 811-49893  | Georg.Schmidt-Dumont@bosch.com<ma...@bosch.com>
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
>

Re: Question regarding Kafka Streams Global State Store

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Georg,

From your description, I do not see why you need to use a global state
instead of a local one. Are there any specific reasons for that? With
a local state store you would have the previous record immediately
available.

Best,
Bruno

On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17)
<Ge...@de.bosch.com.invalid> wrote:
>
> Good morning,
>
> I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.
>
> As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values.
>
> Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state.
>
> The following shows the build-up of my streams app:
>
> //setup global state store
> final KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore( “global-store” );
> final StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new JSONObjectSerde() );
> builder.addGlobalStore( storeBuilder, “global-store-topic”,  Consumed.with( Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new );
>
> //store processor
>
> private KeyValueStore<String, JSONObject> stateStore;
>
> @Override
> public void init( final ProcessorContext context ) {
>    stateStore = (KeyValueStore<String, JSONObject>) context.getStateStore( “global-store” );
> }
>
>
>
> @Override
> public void process( final String key, final JSONObject state ) {
>    log.info( "Update state store for {}: {}.", key, state );
>    lastRecentStateStore.put( key, state );
> }
>
>
> //streams setup
>
> final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
>
> final KStream<String, JSONObject> stream = builder.stream( “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )
>
>                    .transformValues( ValueTransformer::new )
>
>
>
> stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> //global state store access in ValueTransformer
>
> JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
>                                    .orElse( new JSONObject() );
>
>
> I have set the acknowledge property for the producers to “all”.
>
> I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success.
>
> How can I setup a global state which meets the requirements as describe in the scenario above?
>
> Thank you!
>
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
>
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | www.bosch.com<http://www.bosch.com/>
> Phone +49 711 811-49893  | Georg.Schmidt-Dumont@bosch.com<ma...@bosch.com>
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
>