You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pirow Engelbrecht <pi...@etion.co.za> on 2020/08/19 12:53:36 UTC

Kafka Streams Key-value store question

Hello,

We're building a JSON decorator using Kafka Streams' processing API.

The process is briefly that a piece of JSON should be consumed from an input topic (keys are null, value is the JSON). The JSON contains a field (e.g. "thisField") with a value (e.g. "someLink") . This value (and a timestamp) is used to look-up another piece JSON from a key-value topic (keys are all the different values of "thisField", values are JSON). This key-value topic is created by another service in Kafka. This additional piece of JSON then gets appended to the input JSON and the result gets written to an output topic (keys are null, value is now the original JSON + lookup JSON).

To do the query against a key-value store, ideally I want Kafka Streams to directly create and update a window key-value store in memory (or disk) from my key-value topic in Kafka, but I am unable to find a way to specify this through the StoreBuilder interface. Does anybody know how to do this?
Here is my current Storebuilder code snippet:
StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(
                            Stores.persistentWindowStore("loopkupStore", Duration.ofDays(14600), Duration.ofDays(14600), false),
                            Serdes.String(),
                            Serdes.String());
                    storeBuilder.build();


Currently my workaround is to have a sink for the key-value store and then create/update this key-value store using a node in the processing topology, but this has issues when restarting the service, i.e. when the service is restarted, the key-value store topic needs to be consumed from the start to rebuild the store in memory, but the sink would have written commit offsets which prevents the topic to be consumed from the start. I also cannot use streams.cleanUp() as this will reset all the sinks in my topology (y other sink ingests records from the input topic).

Thanks

Pirow Engelbrecht
System Engineer

E. pirow.engelbrecht@etion.co.za<fi...@etion.co.za>
T. +27 12 678 9740 (ext. 9879)
M. +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157<https://goo.gl/maps/v9ZbwjqpPyL2>
www.etion.co.za<https://www.parsec.co.za/>


[cid:image001.jpg@01D67637.6A057630]<https://www.parsec.co.za/>


Facebook<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | YouTube<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | LinkedIn<https://www.linkedin.com/company/etionltd> | Twitter<https://twitter.com/Etionlimited> | Instagram<https://www.instagram.com/Etionlimited/>



Re: Kafka Streams Key-value store question

Posted by Nicolas Carlot <ni...@chronopost.fr.INVALID>.
You need to set the auto offset reset to earliest, it uses latest as
default.

StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest"

Le jeu. 20 août 2020 à 13:58, Pirow Engelbrecht <
pirow.engelbrecht@etion.co.za> a écrit :

> Hi Bill,
>
>
>
> Yes, that seems to be exactly what I need. I’ve instantiated this global
> store with:
>
> topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String().
> deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic,
> "KVprocessor", KeyValueProcessor::new);
>
>
>
>
>
> I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs
> into the store. The problem is that if the application starts for the first
> time, it does not process any key-value pairs already in the Kafka topic.
> Is there a way around this?
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht*
> System Engineer
>
> *E.* pirow.engelbrecht@etion.co.za
> *T.* +27 12 678 9740 (ext. 9879)
> *M.* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> <https://goo.gl/maps/v9ZbwjqpPyL2>
> *www.etion.co.za* <https://www.parsec.co.za/>
>
> <https://www.parsec.co.za/>
>
> Facebook
> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> <https://twitter.com/Etionlimited> | Instagram
> <https://www.instagram.com/Etionlimited/>
>
>
>
> *From:* Bill Bejeck <bi...@confluent.io>
> *Sent:* Wednesday, 19 August 2020 3:53 PM
> *To:* users@kafka.apache.org
> *Subject:* Re: Kafka Streams Key-value store question
>
>
>
> Hi Pirow,
>
> If I'm understanding your requirements correctly, I think using a global
> store
> <
> https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-
> >
> will
> work for you.
>
> HTH,
> Bill
>
> On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
> pirow.engelbrecht@etion.co.za> wrote:
>
> > Hello,
> >
> >
> >
> > We’re building a JSON decorator using Kafka Streams’ processing API.
> >
> >
> >
> > The process is briefly that a piece of JSON should be consumed from an
> > input topic (keys are null, value is the JSON). The JSON contains a field
> > (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
> > timestamp) is used to look-up another piece JSON from a key-value topic
> > (keys are all the different values of “thisField”, values are JSON). This
> > key-value topic is created by another service in Kafka. This additional
> > piece of JSON then gets appended to the input JSON and the result gets
> > written to an output topic (keys are null, value is now the original
> JSON +
> > lookup JSON).
> >
> >
> >
> > To do the query against a key-value store, ideally I want Kafka Streams
> to
> > directly create and update a window key-value store in memory (or disk)
> > from my key-value topic in Kafka, but I am unable to find a way to
> specify
> > this through the StoreBuilder interface. Does anybody know how to do
> this?
> >
> > Here is my current Storebuilder code snippet:
> >
> > StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
> > windowStoreBuilder(
> >
> > Stores.persistentWindowStore("loopkupStore",
> > Duration.ofDays(14600), Duration.ofDays(14600), false),
> >
> > Serdes.String(),
> >
> > Serdes.String());
> >
> > storeBuilder.build();
> >
> >
> >
> >
> >
> > Currently my workaround is to have a sink for the key-value store and
> then
> > create/update this key-value store using a node in the processing
> topology,
> > but this has issues when restarting the service, i.e. when the service is
> > restarted, the key-value store topic needs to be consumed from the start
> to
> > rebuild the store in memory, but the sink would have written commit
> offsets
> > which prevents the topic to be consumed from the start. I also cannot use
> > streams.cleanUp() as this will reset all the sinks in my topology (y
> other
> > sink ingests records from the input topic).
> >
> >
> >
> > Thanks
> >
> >
> >
> > *Pirow Engelbrecht*
> > System Engineer
> >
> > *E.* pirow.engelbrecht@etion.co.za
> > *T.* +27 12 678 9740 (ext. 9879)
> > *M.* +27 63 148 3376
> >
> > 76 Regency Drive | Irene | Centurion | 0157
> > <https://goo.gl/maps/v9ZbwjqpPyL2>
> > *www.etion.co.za <https://www.parsec.co.za/>*
> >
> > <https://www.parsec.co.za/>
> >
> > Facebook
> > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> > <https://twitter.com/Etionlimited> | Instagram
> > <https://www.instagram.com/Etionlimited/>
> >
> >
> >
>


-- 
*Nicolas Carlot*

Lead dev
|  | nicolas.carlot@chronopost.fr


*Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*

[image: Logo Chronopost]
| chronopost.fr <http://www.chronopost.fr/>
Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter
<https://twitter.com/chronopost>.

[image: DPD Group]

Re: Kafka Streams Key-value store question

Posted by "Matthias J. Sax" <mj...@apache.org>.
`auto.offset.reset` does not apply for global-store-topics.

At startup, we app would always "seek-to-beginning" for a
global-store-topic, bootstrap the global-store, and afterwards start the
actually processing.

However, no offsets are committed for global-store-topics. Maybe this is
the reason why you think no data was read?


-Matthias

On 8/20/20 5:30 AM, Liam Clarke-Hutchinson wrote:
> Hi Pirow,
> 
> You can configure the auto offset reset for your stream source's consumer
> to "earliest" if you want to consume all available data if no committed
> offset exists. This will populate the state store on first run.
> 
> Cheers,
> 
> Liam Clarke-Hutchinson
> 
> 
> On Thu, 20 Aug. 2020, 11:58 pm Pirow Engelbrecht, <
> pirow.engelbrecht@etion.co.za> wrote:
> 
>> Hi Bill,
>>
>>
>>
>> Yes, that seems to be exactly what I need. I’ve instantiated this global
>> store with:
>>
>> topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String().
>> deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic,
>> "KVprocessor", KeyValueProcessor::new);
>>
>>
>>
>>
>>
>> I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs
>> into the store. The problem is that if the application starts for the first
>> time, it does not process any key-value pairs already in the Kafka topic.
>> Is there a way around this?
>>
>>
>>
>> Thanks
>>
>>
>>
>> *Pirow Engelbrecht*
>> System Engineer
>>
>> *E.* pirow.engelbrecht@etion.co.za
>> *T.* +27 12 678 9740 (ext. 9879)
>> *M.* +27 63 148 3376
>>
>> 76 Regency Drive | Irene | Centurion | 0157
>> <https://goo.gl/maps/v9ZbwjqpPyL2>
>> *www.etion.co.za* <https://www.parsec.co.za/>
>>
>> <https://www.parsec.co.za/>
>>
>> Facebook
>> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
>> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
>> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
>> <https://twitter.com/Etionlimited> | Instagram
>> <https://www.instagram.com/Etionlimited/>
>>
>>
>>
>> *From:* Bill Bejeck <bi...@confluent.io>
>> *Sent:* Wednesday, 19 August 2020 3:53 PM
>> *To:* users@kafka.apache.org
>> *Subject:* Re: Kafka Streams Key-value store question
>>
>>
>>
>> Hi Pirow,
>>
>> If I'm understanding your requirements correctly, I think using a global
>> store
>> <
>> https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-
>>>
>> will
>> work for you.
>>
>> HTH,
>> Bill
>>
>> On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
>> pirow.engelbrecht@etion.co.za> wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> We’re building a JSON decorator using Kafka Streams’ processing API.
>>>
>>>
>>>
>>> The process is briefly that a piece of JSON should be consumed from an
>>> input topic (keys are null, value is the JSON). The JSON contains a field
>>> (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
>>> timestamp) is used to look-up another piece JSON from a key-value topic
>>> (keys are all the different values of “thisField”, values are JSON). This
>>> key-value topic is created by another service in Kafka. This additional
>>> piece of JSON then gets appended to the input JSON and the result gets
>>> written to an output topic (keys are null, value is now the original
>> JSON +
>>> lookup JSON).
>>>
>>>
>>>
>>> To do the query against a key-value store, ideally I want Kafka Streams
>> to
>>> directly create and update a window key-value store in memory (or disk)
>>> from my key-value topic in Kafka, but I am unable to find a way to
>> specify
>>> this through the StoreBuilder interface. Does anybody know how to do
>> this?
>>>
>>> Here is my current Storebuilder code snippet:
>>>
>>> StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
>>> windowStoreBuilder(
>>>
>>> Stores.persistentWindowStore("loopkupStore",
>>> Duration.ofDays(14600), Duration.ofDays(14600), false),
>>>
>>> Serdes.String(),
>>>
>>> Serdes.String());
>>>
>>> storeBuilder.build();
>>>
>>>
>>>
>>>
>>>
>>> Currently my workaround is to have a sink for the key-value store and
>> then
>>> create/update this key-value store using a node in the processing
>> topology,
>>> but this has issues when restarting the service, i.e. when the service is
>>> restarted, the key-value store topic needs to be consumed from the start
>> to
>>> rebuild the store in memory, but the sink would have written commit
>> offsets
>>> which prevents the topic to be consumed from the start. I also cannot use
>>> streams.cleanUp() as this will reset all the sinks in my topology (y
>> other
>>> sink ingests records from the input topic).
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> *Pirow Engelbrecht*
>>> System Engineer
>>>
>>> *E.* pirow.engelbrecht@etion.co.za
>>> *T.* +27 12 678 9740 (ext. 9879)
>>> *M.* +27 63 148 3376
>>>
>>> 76 Regency Drive | Irene | Centurion | 0157
>>> <https://goo.gl/maps/v9ZbwjqpPyL2>
>>> *www.etion.co.za <https://www.parsec.co.za/>*
>>>
>>> <https://www.parsec.co.za/>
>>>
>>> Facebook
>>> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
>>> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
>>> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
>>> <https://twitter.com/Etionlimited> | Instagram
>>> <https://www.instagram.com/Etionlimited/>
>>>
>>>
>>>
>>
> 


Re: Kafka Streams Key-value store question

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Pirow,

You can configure the auto offset reset for your stream source's consumer
to "earliest" if you want to consume all available data if no committed
offset exists. This will populate the state store on first run.

Cheers,

Liam Clarke-Hutchinson


On Thu, 20 Aug. 2020, 11:58 pm Pirow Engelbrecht, <
pirow.engelbrecht@etion.co.za> wrote:

> Hi Bill,
>
>
>
> Yes, that seems to be exactly what I need. I’ve instantiated this global
> store with:
>
> topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String().
> deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic,
> "KVprocessor", KeyValueProcessor::new);
>
>
>
>
>
> I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs
> into the store. The problem is that if the application starts for the first
> time, it does not process any key-value pairs already in the Kafka topic.
> Is there a way around this?
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht*
> System Engineer
>
> *E.* pirow.engelbrecht@etion.co.za
> *T.* +27 12 678 9740 (ext. 9879)
> *M.* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> <https://goo.gl/maps/v9ZbwjqpPyL2>
> *www.etion.co.za* <https://www.parsec.co.za/>
>
> <https://www.parsec.co.za/>
>
> Facebook
> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> <https://twitter.com/Etionlimited> | Instagram
> <https://www.instagram.com/Etionlimited/>
>
>
>
> *From:* Bill Bejeck <bi...@confluent.io>
> *Sent:* Wednesday, 19 August 2020 3:53 PM
> *To:* users@kafka.apache.org
> *Subject:* Re: Kafka Streams Key-value store question
>
>
>
> Hi Pirow,
>
> If I'm understanding your requirements correctly, I think using a global
> store
> <
> https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-
> >
> will
> work for you.
>
> HTH,
> Bill
>
> On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
> pirow.engelbrecht@etion.co.za> wrote:
>
> > Hello,
> >
> >
> >
> > We’re building a JSON decorator using Kafka Streams’ processing API.
> >
> >
> >
> > The process is briefly that a piece of JSON should be consumed from an
> > input topic (keys are null, value is the JSON). The JSON contains a field
> > (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
> > timestamp) is used to look-up another piece JSON from a key-value topic
> > (keys are all the different values of “thisField”, values are JSON). This
> > key-value topic is created by another service in Kafka. This additional
> > piece of JSON then gets appended to the input JSON and the result gets
> > written to an output topic (keys are null, value is now the original
> JSON +
> > lookup JSON).
> >
> >
> >
> > To do the query against a key-value store, ideally I want Kafka Streams
> to
> > directly create and update a window key-value store in memory (or disk)
> > from my key-value topic in Kafka, but I am unable to find a way to
> specify
> > this through the StoreBuilder interface. Does anybody know how to do
> this?
> >
> > Here is my current Storebuilder code snippet:
> >
> > StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
> > windowStoreBuilder(
> >
> > Stores.persistentWindowStore("loopkupStore",
> > Duration.ofDays(14600), Duration.ofDays(14600), false),
> >
> > Serdes.String(),
> >
> > Serdes.String());
> >
> > storeBuilder.build();
> >
> >
> >
> >
> >
> > Currently my workaround is to have a sink for the key-value store and
> then
> > create/update this key-value store using a node in the processing
> topology,
> > but this has issues when restarting the service, i.e. when the service is
> > restarted, the key-value store topic needs to be consumed from the start
> to
> > rebuild the store in memory, but the sink would have written commit
> offsets
> > which prevents the topic to be consumed from the start. I also cannot use
> > streams.cleanUp() as this will reset all the sinks in my topology (y
> other
> > sink ingests records from the input topic).
> >
> >
> >
> > Thanks
> >
> >
> >
> > *Pirow Engelbrecht*
> > System Engineer
> >
> > *E.* pirow.engelbrecht@etion.co.za
> > *T.* +27 12 678 9740 (ext. 9879)
> > *M.* +27 63 148 3376
> >
> > 76 Regency Drive | Irene | Centurion | 0157
> > <https://goo.gl/maps/v9ZbwjqpPyL2>
> > *www.etion.co.za <https://www.parsec.co.za/>*
> >
> > <https://www.parsec.co.za/>
> >
> > Facebook
> > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> > <https://twitter.com/Etionlimited> | Instagram
> > <https://www.instagram.com/Etionlimited/>
> >
> >
> >
>

RE: Kafka Streams Key-value store question

Posted by Pirow Engelbrecht <pi...@etion.co.za>.
Hi Bill,

Yes, that seems to be exactly what I need. I’ve instantiated this global store with:
topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String().deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic, "KVprocessor", KeyValueProcessor::new);


I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs into the store. The problem is that if the application starts for the first time, it does not process any key-value pairs already in the Kafka topic. Is there a way around this?

Thanks

Pirow Engelbrecht
System Engineer

E. pirow.engelbrecht@etion.co.za<fi...@etion.co.za>
T. +27 12 678 9740 (ext. 9879)
M. +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157<https://goo.gl/maps/v9ZbwjqpPyL2>
www.etion.co.za<https://www.parsec.co.za/>


[cid:image001.jpg@01D67642.9B5709A0]<https://www.parsec.co.za/>


Facebook<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | YouTube<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | LinkedIn<https://www.linkedin.com/company/etionltd> | Twitter<https://twitter.com/Etionlimited> | Instagram<https://www.instagram.com/Etionlimited/>


From: Bill Bejeck <bi...@confluent.io>
Sent: Wednesday, 19 August 2020 3:53 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams Key-value store question

Hi Pirow,

If I'm understanding your requirements correctly, I think using a global
store
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier->>
will
work for you.

HTH,
Bill

On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
pirow.engelbrecht@etion.co.za<ma...@etion.co.za>> wrote:

> Hello,
>
>
>
> We’re building a JSON decorator using Kafka Streams’ processing API.
>
>
>
> The process is briefly that a piece of JSON should be consumed from an
> input topic (keys are null, value is the JSON). The JSON contains a field
> (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
> timestamp) is used to look-up another piece JSON from a key-value topic
> (keys are all the different values of “thisField”, values are JSON). This
> key-value topic is created by another service in Kafka. This additional
> piece of JSON then gets appended to the input JSON and the result gets
> written to an output topic (keys are null, value is now the original JSON +
> lookup JSON).
>
>
>
> To do the query against a key-value store, ideally I want Kafka Streams to
> directly create and update a window key-value store in memory (or disk)
> from my key-value topic in Kafka, but I am unable to find a way to specify
> this through the StoreBuilder interface. Does anybody know how to do this?
>
> Here is my current Storebuilder code snippet:
>
> StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
> windowStoreBuilder(
>
> Stores.persistentWindowStore("loopkupStore",
> Duration.ofDays(14600), Duration.ofDays(14600), false),
>
> Serdes.String(),
>
> Serdes.String());
>
> storeBuilder.build();
>
>
>
>
>
> Currently my workaround is to have a sink for the key-value store and then
> create/update this key-value store using a node in the processing topology,
> but this has issues when restarting the service, i.e. when the service is
> restarted, the key-value store topic needs to be consumed from the start to
> rebuild the store in memory, but the sink would have written commit offsets
> which prevents the topic to be consumed from the start. I also cannot use
> streams.cleanUp() as this will reset all the sinks in my topology (y other
> sink ingests records from the input topic).
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht*
> System Engineer
>
> *E.* pirow.engelbrecht@etion.co.za<ma...@etion.co.za>
> *T.* +27 12 678 9740 (ext. 9879)
> *M.* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> <https://goo.gl/maps/v9ZbwjqpPyL2<https://goo.gl/maps/v9ZbwjqpPyL2>>
> *www.etion.co.za<http://www.etion.co.za> <https://www.parsec.co.za/<https://www.parsec.co.za>>*
>
> <https://www.parsec.co.za/<https://www.parsec.co.za/>>
>
> Facebook
> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr>> |
> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A>> |
> LinkedIn <https://www.linkedin.com/company/etionltd<https://www.linkedin.com/company/etionltd>> | Twitter
> <https://twitter.com/Etionlimited<https://twitter.com/Etionlimited>> | Instagram
> <https://www.instagram.com/Etionlimited/<https://www.instagram.com/Etionlimited>>
>
>
>

Re: Kafka Streams Key-value store question

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Pirow,

If I'm understanding your requirements correctly, I think using a global
store
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier->
will
work for you.

HTH,
Bill

On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
pirow.engelbrecht@etion.co.za> wrote:

> Hello,
>
>
>
> We’re building a JSON decorator using Kafka Streams’ processing API.
>
>
>
> The process is briefly that a piece of JSON should be consumed from an
> input topic (keys are null, value is the JSON). The JSON contains a field
> (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
> timestamp) is used to look-up another piece JSON from a key-value topic
> (keys are all the different values of “thisField”, values are JSON). This
> key-value topic is created by another service in Kafka. This additional
> piece of JSON then gets appended to the input JSON and the result gets
> written to an output topic (keys are null, value is now the original JSON +
> lookup JSON).
>
>
>
> To do the query against a key-value store, ideally I want Kafka Streams to
> directly create and update a window key-value store in memory (or disk)
> from my key-value topic in Kafka, but I am unable to find a way to specify
> this through the StoreBuilder interface. Does anybody know how to do this?
>
> Here is my current Storebuilder code snippet:
>
> StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
> windowStoreBuilder(
>
>                             Stores.persistentWindowStore("loopkupStore",
> Duration.ofDays(14600), Duration.ofDays(14600), false),
>
>                             Serdes.String(),
>
>                             Serdes.String());
>
>                     storeBuilder.build();
>
>
>
>
>
> Currently my workaround is to have a sink for the key-value store and then
> create/update this key-value store using a node in the processing topology,
> but this has issues when restarting the service, i.e. when the service is
> restarted, the key-value store topic needs to be consumed from the start to
> rebuild the store in memory, but the sink would have written commit offsets
> which prevents the topic to be consumed from the start. I also cannot use
> streams.cleanUp() as this will reset all the sinks in my topology (y other
> sink ingests records from the input topic).
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht*
> System Engineer
>
> *E.* pirow.engelbrecht@etion.co.za
> *T.* +27 12 678 9740 (ext. 9879)
> *M.* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> <https://goo.gl/maps/v9ZbwjqpPyL2>
> *www.etion.co.za <https://www.parsec.co.za/>*
>
> <https://www.parsec.co.za/>
>
> Facebook
> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> <https://twitter.com/Etionlimited> | Instagram
> <https://www.instagram.com/Etionlimited/>
>
>
>