You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2020/05/01 06:11:12 UTC

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

thanks... will try with GlobalKTable.
As a side question, I didn't really understand the significance of global
state store which kind of works in a reverse way to local state store i.e.
local state store is updated and then saved to changelog topic whereas in
case of global state store the topic is updated first and then synced to
global state store. Do these two work in sync i.e. the update to topic and
global state store ?

Say one stream thread updates the topic for global store and starts
processing next event wherein the processor tries to read the global store
which may not have been synced with the topic?

On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> wrote:

> Yes.
>
> A `GlobalKTable` uses a global store internally.
>
> You can also use `StreamsBuilder.addGlobalStore()` or
> `Topology.addGlobalStore()` to add a global store "manually".
>
>
> -Matthias
>
>
> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > Thanks Matthias.
> > Can you elaborate on the replicated caching layer part?
> > When you say global stores, do you mean GlobalKTable created from a topic
> > e.g. using StreamsBuilder.globalTable(String topic) method ?
> >
> > On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> It's not possible to modify state store from "outside".
> >>
> >> If you want to build a "replicated caching layer", you could use global
> >> stores and write into the corresponding topics to update all stores. Of
> >> course, those updates would be async.
> >>
> >>
> >> -Matthias
> >>
> >> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >>> Hi All,
> >>>
> >>> I am wondering if this is possible: i have been asked to use state
> stores
> >>> as a general replicated cache among multiple instances of service
> >> instances
> >>> however the state store is created through streambuilder but is not
> >>> actually modified through stream processor topology however it is to be
> >>> modified from outside the stream topology. So, essentially, the state
> >> store
> >>> is just to be created from streambuilder and then to be used as an
> >>> application level cache that will get replicated between application
> >>> instances. Is this possible using state stores?
> >>>
> >>> Secondly, if possible, is this a good design approach?
> >>>
> >>> Appreciate your response since I don't know the internals of state
> >> stores.
> >>>
> >>
> >>
> >
>
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Hi Matthias,

I have configured the GlobalKTable to stream from a topic and application
is working fine, however during automated build test cases, sometimes I get
an exception: I believe this could be because of race between actual topic
creation and the service startup (since topic creation may not happen
immediately and the API returns future).
The question I have here: is there any property that I can set so the
streams application would perform some retries to establish the global
store before giving up with an exception?

","logger_name":"com.avaya.analytics.dsi.DsiMain","thread_name":"main","level":"DEBUG","level_value":10000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
There are no partitions available for topic analytics-group-cache when
initializing global store group-cache-store\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.topicPartitionsForStore(GlobalStateManagerImpl.java:265)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.register(GlobalStateManagerImpl.java:190)\n\tat
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:101)\n\tat
org.apache.kafka.streams.state.internals.InMemoryKeyValueStore.init(InMemoryKeyValueStore.java:58)\n\tat
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder$InMemoryTimestampedKeyValueStoreMarker.init(TimestampedKeyValueStoreBuilder.java:100)\n\tat
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)\n\tat
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)\n\tat
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)\n\tat
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:112)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:136)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.initialize(GlobalStreamThread.java:229)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStreamThread.initialize(GlobalStreamThread.java:345)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:270)\n"}




On Thu, May 28, 2020 at 1:27 PM Pushkar Deole <pd...@gmail.com> wrote:

> Matthias,
>
> I realized that the exception and actual problem is totally different. The
> problem was the client was not set with SSL truststore while server is
> SSLenabled.
> I also found this open bug on kafka
> https://issues.apache.org/jira/browse/KAFKA-4493
> After setting the SSL properties on stream, I am able to get it up and
> running.
>
> Due to above problem, it is very difficult to debug the issue and above
> bug can be fixed as soon as possible, or a proper exception should be
> thrown.
>
> On Wed, May 27, 2020 at 10:59 PM Pushkar Deole <pd...@gmail.com>
> wrote:
>
>> Thanks... i will try increasing the memory in case you don't spot
>> anything wrong with the code. Other service also have streams and global k
>> table but they use spring-kafka, but i think that should not matter, and it
>> should work with normal kafka-streams code unless i am missing some
>> configuration/setting here
>>
>> On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>
>>> There is no hook. Only a restore listener, but this one is only used
>>> during startup when the global store is loaded. It's not sure during
>>> regular processing.
>>>
>>> Depending on your usage, maybe you can switch to a global store instead
>>> of GlobalKTable? That way, you can implement a custom `Processor` and
>>> add a hook manually?
>>>
>>> I don't see anything wrong with your setup. Unclear if/why the global
>>> store would require a lot of memory...
>>>
>>>
>>> -Matthias
>>>
>>> On 5/27/20 7:41 AM, Pushkar Deole wrote:
>>> > Matthias,
>>> > I tried with default store as well but getting same error, can you
>>> please
>>> > check if I am initializing the global store in the right way:
>>> >
>>> > public void setupGlobalCacheTables(String theKafkaServers) {
>>> >     Properties props = new Properties();
>>> >     props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> DEFAULT_APPLICATION_ID);
>>> >     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
>>> >     StreamsBuilder streamsBuilder = new StreamsBuilder();
>>> >     groupCacheTable =
>>> >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
>>> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
>>> >         Materialized.as(GROUP_CACHE_STORE_NAME));
>>> >     Topology groupCacheTopology = streamsBuilder.build();
>>> >      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
>>> >       kafkaStreams.start();
>>> >
>>> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>>> > LOG.info("Stopping the stream");
>>> > kafkaStreams.close();
>>> > }));
>>> > }
>>> >
>>> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pd...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi Matthias,
>>> >>
>>> >> By the way, I used the in-memory global store and the service is
>>> giving
>>> >> out of memory error during startup. Unfortunately i don't have a stack
>>> >> trace now but when i got stack the first time, the error was coming
>>> >> somewhere from memorypool.allocate or similar kind of method. If i
>>> get the
>>> >> stack trace again, I will share that with you.
>>> >> However, the topic from where the store is reading from is empty so I
>>> am
>>> >> not sure why the global k table is trying to occupy a lot of space.
>>> The POD
>>> >> memory request and limits are 500 MiB and 750 MiB respectively so the
>>> state
>>> >> store should fit into this memory I believe since topic is empty. Can
>>> you
>>> >> provide inputs on this.
>>> >>
>>> >>
>>> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pd...@gmail.com>
>>> >> wrote:
>>> >>
>>> >>> Ok... got it... is there any hook that I can attach to the global k
>>> table
>>> >>> or global store? What I mean here is I want to know when the global
>>> store
>>> >>> is updated with data from topic in that case the hook that I
>>> specified
>>> >>> should be invoked so i can do some activity like logging that, this
>>> will
>>> >>> allow me to know how long the global store took to sync up with
>>> topic after
>>> >>> the event has been put on the topic.
>>> >>>
>>> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>>> >>> wrote:
>>> >>>
>>> >>>> For example it could be some "static" information, like a mapping
>>> from
>>> >>>> zip code to city name.
>>> >>>>
>>> >>>> Something that does usually not change over time.
>>> >>>>
>>> >>>>
>>> >>>> -Matthias
>>> >>>>
>>> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>>> >>>>> Matthias,
>>> >>>>>
>>> >>>>> I am wondering what you mean by "Global store hold "axially" data
>>> that
>>> >>>> is
>>> >>>>> provided from "outside" of the
>>> >>>>> app"
>>> >>>>>
>>> >>>>> will you be able to give some example use case here as to what you
>>> >>>> mean by
>>> >>>>> axially data provided from outside app?
>>> >>>>>
>>> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>>> >>>> wrote:
>>> >>>>>
>>> >>>>>> Both stores sever a different purpose.
>>> >>>>>>
>>> >>>>>> Regular stores allow you to store state the application computes.
>>> >>>>>> Writing into the changelog is a fault-tolerance mechanism.
>>> >>>>>>
>>> >>>>>> Global store hold "axially" data that is provided from "outside"
>>> of
>>> >>>> the
>>> >>>>>> app. There is no changelog topic, but only the input topic (that
>>> is
>>> >>>> used
>>> >>>>>> to re-create the global state).
>>> >>>>>>
>>> >>>>>> Local stores are sharded and updates are "sync" as they don't
>>> need to
>>> >>>> be
>>> >>>>>> shared with anybody else.
>>> >>>>>>
>>> >>>>>> For global stores, as all instances need to be updated, updates
>>> are
>>> >>>>>> async (we don't know when which instance will update it's own
>>> global
>>> >>>>>> store replica).
>>> >>>>>>
>>> >>>>>>>> Say one stream thread updates the topic for global store and
>>> starts
>>> >>>>>>>> processing next event wherein the processor tries to read the
>>> global
>>> >>>>>> store
>>> >>>>>>>> which may not have been synced with the topic?
>>> >>>>>>
>>> >>>>>> Correct. There is no guarantee when the update to the global store
>>> >>>> will
>>> >>>>>> be applied. As said, global stores are not designed to hold data
>>> the
>>> >>>>>> application computes.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> -Matthias
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>>> >>>>>>> thanks... will try with GlobalKTable.
>>> >>>>>>> As a side question, I didn't really understand the significance
>>> of
>>> >>>> global
>>> >>>>>>> state store which kind of works in a reverse way to local state
>>> store
>>> >>>>>> i.e.
>>> >>>>>>> local state store is updated and then saved to changelog topic
>>> >>>> whereas in
>>> >>>>>>> case of global state store the topic is updated first and then
>>> >>>> synced to
>>> >>>>>>> global state store. Do these two work in sync i.e. the update to
>>> >>>> topic
>>> >>>>>> and
>>> >>>>>>> global state store ?
>>> >>>>>>>
>>> >>>>>>> Say one stream thread updates the topic for global store and
>>> starts
>>> >>>>>>> processing next event wherein the processor tries to read the
>>> global
>>> >>>>>> store
>>> >>>>>>> which may not have been synced with the topic?
>>> >>>>>>>
>>> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mjsax@apache.org
>>> >
>>> >>>> wrote:
>>> >>>>>>>
>>> >>>>>>>> Yes.
>>> >>>>>>>>
>>> >>>>>>>> A `GlobalKTable` uses a global store internally.
>>> >>>>>>>>
>>> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
>>> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> -Matthias
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>> >>>>>>>>> Thanks Matthias.
>>> >>>>>>>>> Can you elaborate on the replicated caching layer part?
>>> >>>>>>>>> When you say global stores, do you mean GlobalKTable created
>>> from a
>>> >>>>>> topic
>>> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>> >>>>>>>>>
>>> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
>>> mjsax@apache.org
>>> >>>>>
>>> >>>>>>>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>>> It's not possible to modify state store from "outside".
>>> >>>>>>>>>>
>>> >>>>>>>>>> If you want to build a "replicated caching layer", you could
>>> use
>>> >>>>>> global
>>> >>>>>>>>>> stores and write into the corresponding topics to update all
>>> >>>> stores.
>>> >>>>>> Of
>>> >>>>>>>>>> course, those updates would be async.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> -Matthias
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>> >>>>>>>>>>> Hi All,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use
>>> >>>> state
>>> >>>>>>>> stores
>>> >>>>>>>>>>> as a general replicated cache among multiple instances of
>>> service
>>> >>>>>>>>>> instances
>>> >>>>>>>>>>> however the state store is created through streambuilder but
>>> is
>>> >>>> not
>>> >>>>>>>>>>> actually modified through stream processor topology however
>>> it
>>> >>>> is to
>>> >>>>>> be
>>> >>>>>>>>>>> modified from outside the stream topology. So, essentially,
>>> the
>>> >>>> state
>>> >>>>>>>>>> store
>>> >>>>>>>>>>> is just to be created from streambuilder and then to be used
>>> as
>>> >>>> an
>>> >>>>>>>>>>> application level cache that will get replicated between
>>> >>>> application
>>> >>>>>>>>>>> instances. Is this possible using state stores?
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Secondly, if possible, is this a good design approach?
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Appreciate your response since I don't know the internals of
>>> >>>> state
>>> >>>>>>>>>> stores.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >
>>>
>>>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

I realized that the exception and actual problem is totally different. The
problem was the client was not set with SSL truststore while server is
SSLenabled.
I also found this open bug on kafka
https://issues.apache.org/jira/browse/KAFKA-4493
After setting the SSL properties on stream, I am able to get it up and
running.

Due to above problem, it is very difficult to debug the issue and above bug
can be fixed as soon as possible, or a proper exception should be thrown.

On Wed, May 27, 2020 at 10:59 PM Pushkar Deole <pd...@gmail.com> wrote:

> Thanks... i will try increasing the memory in case you don't spot anything
> wrong with the code. Other service also have streams and global k table but
> they use spring-kafka, but i think that should not matter, and it should
> work with normal kafka-streams code unless i am missing some
> configuration/setting here
>
> On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> There is no hook. Only a restore listener, but this one is only used
>> during startup when the global store is loaded. It's not sure during
>> regular processing.
>>
>> Depending on your usage, maybe you can switch to a global store instead
>> of GlobalKTable? That way, you can implement a custom `Processor` and
>> add a hook manually?
>>
>> I don't see anything wrong with your setup. Unclear if/why the global
>> store would require a lot of memory...
>>
>>
>> -Matthias
>>
>> On 5/27/20 7:41 AM, Pushkar Deole wrote:
>> > Matthias,
>> > I tried with default store as well but getting same error, can you
>> please
>> > check if I am initializing the global store in the right way:
>> >
>> > public void setupGlobalCacheTables(String theKafkaServers) {
>> >     Properties props = new Properties();
>> >     props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> DEFAULT_APPLICATION_ID);
>> >     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
>> >     StreamsBuilder streamsBuilder = new StreamsBuilder();
>> >     groupCacheTable =
>> >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
>> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
>> >         Materialized.as(GROUP_CACHE_STORE_NAME));
>> >     Topology groupCacheTopology = streamsBuilder.build();
>> >      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
>> >       kafkaStreams.start();
>> >
>> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>> > LOG.info("Stopping the stream");
>> > kafkaStreams.close();
>> > }));
>> > }
>> >
>> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pd...@gmail.com>
>> wrote:
>> >
>> >> Hi Matthias,
>> >>
>> >> By the way, I used the in-memory global store and the service is giving
>> >> out of memory error during startup. Unfortunately i don't have a stack
>> >> trace now but when i got stack the first time, the error was coming
>> >> somewhere from memorypool.allocate or similar kind of method. If i get
>> the
>> >> stack trace again, I will share that with you.
>> >> However, the topic from where the store is reading from is empty so I
>> am
>> >> not sure why the global k table is trying to occupy a lot of space.
>> The POD
>> >> memory request and limits are 500 MiB and 750 MiB respectively so the
>> state
>> >> store should fit into this memory I believe since topic is empty. Can
>> you
>> >> provide inputs on this.
>> >>
>> >>
>> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pd...@gmail.com>
>> >> wrote:
>> >>
>> >>> Ok... got it... is there any hook that I can attach to the global k
>> table
>> >>> or global store? What I mean here is I want to know when the global
>> store
>> >>> is updated with data from topic in that case the hook that I specified
>> >>> should be invoked so i can do some activity like logging that, this
>> will
>> >>> allow me to know how long the global store took to sync up with topic
>> after
>> >>> the event has been put on the topic.
>> >>>
>> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>> >>> wrote:
>> >>>
>> >>>> For example it could be some "static" information, like a mapping
>> from
>> >>>> zip code to city name.
>> >>>>
>> >>>> Something that does usually not change over time.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>> >>>>> Matthias,
>> >>>>>
>> >>>>> I am wondering what you mean by "Global store hold "axially" data
>> that
>> >>>> is
>> >>>>> provided from "outside" of the
>> >>>>> app"
>> >>>>>
>> >>>>> will you be able to give some example use case here as to what you
>> >>>> mean by
>> >>>>> axially data provided from outside app?
>> >>>>>
>> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>> >>>> wrote:
>> >>>>>
>> >>>>>> Both stores sever a different purpose.
>> >>>>>>
>> >>>>>> Regular stores allow you to store state the application computes.
>> >>>>>> Writing into the changelog is a fault-tolerance mechanism.
>> >>>>>>
>> >>>>>> Global store hold "axially" data that is provided from "outside" of
>> >>>> the
>> >>>>>> app. There is no changelog topic, but only the input topic (that is
>> >>>> used
>> >>>>>> to re-create the global state).
>> >>>>>>
>> >>>>>> Local stores are sharded and updates are "sync" as they don't need
>> to
>> >>>> be
>> >>>>>> shared with anybody else.
>> >>>>>>
>> >>>>>> For global stores, as all instances need to be updated, updates are
>> >>>>>> async (we don't know when which instance will update it's own
>> global
>> >>>>>> store replica).
>> >>>>>>
>> >>>>>>>> Say one stream thread updates the topic for global store and
>> starts
>> >>>>>>>> processing next event wherein the processor tries to read the
>> global
>> >>>>>> store
>> >>>>>>>> which may not have been synced with the topic?
>> >>>>>>
>> >>>>>> Correct. There is no guarantee when the update to the global store
>> >>>> will
>> >>>>>> be applied. As said, global stores are not designed to hold data
>> the
>> >>>>>> application computes.
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>>
>> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>> >>>>>>> thanks... will try with GlobalKTable.
>> >>>>>>> As a side question, I didn't really understand the significance of
>> >>>> global
>> >>>>>>> state store which kind of works in a reverse way to local state
>> store
>> >>>>>> i.e.
>> >>>>>>> local state store is updated and then saved to changelog topic
>> >>>> whereas in
>> >>>>>>> case of global state store the topic is updated first and then
>> >>>> synced to
>> >>>>>>> global state store. Do these two work in sync i.e. the update to
>> >>>> topic
>> >>>>>> and
>> >>>>>>> global state store ?
>> >>>>>>>
>> >>>>>>> Say one stream thread updates the topic for global store and
>> starts
>> >>>>>>> processing next event wherein the processor tries to read the
>> global
>> >>>>>> store
>> >>>>>>> which may not have been synced with the topic?
>> >>>>>>>
>> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>> >>>> wrote:
>> >>>>>>>
>> >>>>>>>> Yes.
>> >>>>>>>>
>> >>>>>>>> A `GlobalKTable` uses a global store internally.
>> >>>>>>>>
>> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
>> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>> >>>>>>>>> Thanks Matthias.
>> >>>>>>>>> Can you elaborate on the replicated caching layer part?
>> >>>>>>>>> When you say global stores, do you mean GlobalKTable created
>> from a
>> >>>>>> topic
>> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>> >>>>>>>>>
>> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
>> mjsax@apache.org
>> >>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> It's not possible to modify state store from "outside".
>> >>>>>>>>>>
>> >>>>>>>>>> If you want to build a "replicated caching layer", you could
>> use
>> >>>>>> global
>> >>>>>>>>>> stores and write into the corresponding topics to update all
>> >>>> stores.
>> >>>>>> Of
>> >>>>>>>>>> course, those updates would be async.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> -Matthias
>> >>>>>>>>>>
>> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>> >>>>>>>>>>> Hi All,
>> >>>>>>>>>>>
>> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use
>> >>>> state
>> >>>>>>>> stores
>> >>>>>>>>>>> as a general replicated cache among multiple instances of
>> service
>> >>>>>>>>>> instances
>> >>>>>>>>>>> however the state store is created through streambuilder but
>> is
>> >>>> not
>> >>>>>>>>>>> actually modified through stream processor topology however it
>> >>>> is to
>> >>>>>> be
>> >>>>>>>>>>> modified from outside the stream topology. So, essentially,
>> the
>> >>>> state
>> >>>>>>>>>> store
>> >>>>>>>>>>> is just to be created from streambuilder and then to be used
>> as
>> >>>> an
>> >>>>>>>>>>> application level cache that will get replicated between
>> >>>> application
>> >>>>>>>>>>> instances. Is this possible using state stores?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Secondly, if possible, is this a good design approach?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Appreciate your response since I don't know the internals of
>> >>>> state
>> >>>>>>>>>> stores.
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >
>>
>>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks... i will try increasing the memory in case you don't spot anything
wrong with the code. Other service also have streams and global k table but
they use spring-kafka, but i think that should not matter, and it should
work with normal kafka-streams code unless i am missing some
configuration/setting here

On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org> wrote:

> There is no hook. Only a restore listener, but this one is only used
> during startup when the global store is loaded. It's not sure during
> regular processing.
>
> Depending on your usage, maybe you can switch to a global store instead
> of GlobalKTable? That way, you can implement a custom `Processor` and
> add a hook manually?
>
> I don't see anything wrong with your setup. Unclear if/why the global
> store would require a lot of memory...
>
>
> -Matthias
>
> On 5/27/20 7:41 AM, Pushkar Deole wrote:
> > Matthias,
> > I tried with default store as well but getting same error, can you please
> > check if I am initializing the global store in the right way:
> >
> > public void setupGlobalCacheTables(String theKafkaServers) {
> >     Properties props = new Properties();
> >     props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> DEFAULT_APPLICATION_ID);
> >     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> >     StreamsBuilder streamsBuilder = new StreamsBuilder();
> >     groupCacheTable =
> >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> >         Materialized.as(GROUP_CACHE_STORE_NAME));
> >     Topology groupCacheTopology = streamsBuilder.build();
> >      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> >       kafkaStreams.start();
> >
> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > LOG.info("Stopping the stream");
> > kafkaStreams.close();
> > }));
> > }
> >
> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pd...@gmail.com>
> wrote:
> >
> >> Hi Matthias,
> >>
> >> By the way, I used the in-memory global store and the service is giving
> >> out of memory error during startup. Unfortunately i don't have a stack
> >> trace now but when i got stack the first time, the error was coming
> >> somewhere from memorypool.allocate or similar kind of method. If i get
> the
> >> stack trace again, I will share that with you.
> >> However, the topic from where the store is reading from is empty so I am
> >> not sure why the global k table is trying to occupy a lot of space. The
> POD
> >> memory request and limits are 500 MiB and 750 MiB respectively so the
> state
> >> store should fit into this memory I believe since topic is empty. Can
> you
> >> provide inputs on this.
> >>
> >>
> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pd...@gmail.com>
> >> wrote:
> >>
> >>> Ok... got it... is there any hook that I can attach to the global k
> table
> >>> or global store? What I mean here is I want to know when the global
> store
> >>> is updated with data from topic in that case the hook that I specified
> >>> should be invoked so i can do some activity like logging that, this
> will
> >>> allow me to know how long the global store took to sync up with topic
> after
> >>> the event has been put on the topic.
> >>>
> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>
> >>>> For example it could be some "static" information, like a mapping from
> >>>> zip code to city name.
> >>>>
> >>>> Something that does usually not change over time.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
> >>>>> Matthias,
> >>>>>
> >>>>> I am wondering what you mean by "Global store hold "axially" data
> that
> >>>> is
> >>>>> provided from "outside" of the
> >>>>> app"
> >>>>>
> >>>>> will you be able to give some example use case here as to what you
> >>>> mean by
> >>>>> axially data provided from outside app?
> >>>>>
> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Both stores sever a different purpose.
> >>>>>>
> >>>>>> Regular stores allow you to store state the application computes.
> >>>>>> Writing into the changelog is a fault-tolerance mechanism.
> >>>>>>
> >>>>>> Global store hold "axially" data that is provided from "outside" of
> >>>> the
> >>>>>> app. There is no changelog topic, but only the input topic (that is
> >>>> used
> >>>>>> to re-create the global state).
> >>>>>>
> >>>>>> Local stores are sharded and updates are "sync" as they don't need
> to
> >>>> be
> >>>>>> shared with anybody else.
> >>>>>>
> >>>>>> For global stores, as all instances need to be updated, updates are
> >>>>>> async (we don't know when which instance will update it's own global
> >>>>>> store replica).
> >>>>>>
> >>>>>>>> Say one stream thread updates the topic for global store and
> starts
> >>>>>>>> processing next event wherein the processor tries to read the
> global
> >>>>>> store
> >>>>>>>> which may not have been synced with the topic?
> >>>>>>
> >>>>>> Correct. There is no guarantee when the update to the global store
> >>>> will
> >>>>>> be applied. As said, global stores are not designed to hold data the
> >>>>>> application computes.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
> >>>>>>> thanks... will try with GlobalKTable.
> >>>>>>> As a side question, I didn't really understand the significance of
> >>>> global
> >>>>>>> state store which kind of works in a reverse way to local state
> store
> >>>>>> i.e.
> >>>>>>> local state store is updated and then saved to changelog topic
> >>>> whereas in
> >>>>>>> case of global state store the topic is updated first and then
> >>>> synced to
> >>>>>>> global state store. Do these two work in sync i.e. the update to
> >>>> topic
> >>>>>> and
> >>>>>>> global state store ?
> >>>>>>>
> >>>>>>> Say one stream thread updates the topic for global store and starts
> >>>>>>> processing next event wherein the processor tries to read the
> global
> >>>>>> store
> >>>>>>> which may not have been synced with the topic?
> >>>>>>>
> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Yes.
> >>>>>>>>
> >>>>>>>> A `GlobalKTable` uses a global store internally.
> >>>>>>>>
> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >>>>>>>>> Thanks Matthias.
> >>>>>>>>> Can you elaborate on the replicated caching layer part?
> >>>>>>>>> When you say global stores, do you mean GlobalKTable created
> from a
> >>>>>> topic
> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >>>>>>>>>
> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> mjsax@apache.org
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> It's not possible to modify state store from "outside".
> >>>>>>>>>>
> >>>>>>>>>> If you want to build a "replicated caching layer", you could use
> >>>>>> global
> >>>>>>>>>> stores and write into the corresponding topics to update all
> >>>> stores.
> >>>>>> Of
> >>>>>>>>>> course, those updates would be async.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >>>>>>>>>>> Hi All,
> >>>>>>>>>>>
> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use
> >>>> state
> >>>>>>>> stores
> >>>>>>>>>>> as a general replicated cache among multiple instances of
> service
> >>>>>>>>>> instances
> >>>>>>>>>>> however the state store is created through streambuilder but is
> >>>> not
> >>>>>>>>>>> actually modified through stream processor topology however it
> >>>> is to
> >>>>>> be
> >>>>>>>>>>> modified from outside the stream topology. So, essentially, the
> >>>> state
> >>>>>>>>>> store
> >>>>>>>>>>> is just to be created from streambuilder and then to be used as
> >>>> an
> >>>>>>>>>>> application level cache that will get replicated between
> >>>> application
> >>>>>>>>>>> instances. Is this possible using state stores?
> >>>>>>>>>>>
> >>>>>>>>>>> Secondly, if possible, is this a good design approach?
> >>>>>>>>>>>
> >>>>>>>>>>> Appreciate your response since I don't know the internals of
> >>>> state
> >>>>>>>>>> stores.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >
>
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by "Matthias J. Sax" <mj...@apache.org>.
There is no hook. Only a restore listener, but this one is only used
during startup when the global store is loaded. It's not sure during
regular processing.

Depending on your usage, maybe you can switch to a global store instead
of GlobalKTable? That way, you can implement a custom `Processor` and
add a hook manually?

I don't see anything wrong with your setup. Unclear if/why the global
store would require a lot of memory...


-Matthias

On 5/27/20 7:41 AM, Pushkar Deole wrote:
> Matthias,
> I tried with default store as well but getting same error, can you please
> check if I am initializing the global store in the right way:
> 
> public void setupGlobalCacheTables(String theKafkaServers) {
>     Properties props = new Properties();
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
>     StreamsBuilder streamsBuilder = new StreamsBuilder();
>     groupCacheTable =
>     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
>         Materialized.as(GROUP_CACHE_STORE_NAME));
>     Topology groupCacheTopology = streamsBuilder.build();
>      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
>       kafkaStreams.start();
> 
> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> LOG.info("Stopping the stream");
> kafkaStreams.close();
> }));
> }
> 
> On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pd...@gmail.com> wrote:
> 
>> Hi Matthias,
>>
>> By the way, I used the in-memory global store and the service is giving
>> out of memory error during startup. Unfortunately i don't have a stack
>> trace now but when i got stack the first time, the error was coming
>> somewhere from memorypool.allocate or similar kind of method. If i get the
>> stack trace again, I will share that with you.
>> However, the topic from where the store is reading from is empty so I am
>> not sure why the global k table is trying to occupy a lot of space. The POD
>> memory request and limits are 500 MiB and 750 MiB respectively so the state
>> store should fit into this memory I believe since topic is empty. Can you
>> provide inputs on this.
>>
>>
>> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pd...@gmail.com>
>> wrote:
>>
>>> Ok... got it... is there any hook that I can attach to the global k table
>>> or global store? What I mean here is I want to know when the global store
>>> is updated with data from topic in that case the hook that I specified
>>> should be invoked so i can do some activity like logging that, this will
>>> allow me to know how long the global store took to sync up with topic after
>>> the event has been put on the topic.
>>>
>>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>
>>>> For example it could be some "static" information, like a mapping from
>>>> zip code to city name.
>>>>
>>>> Something that does usually not change over time.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>>>>> Matthias,
>>>>>
>>>>> I am wondering what you mean by "Global store hold "axially" data that
>>>> is
>>>>> provided from "outside" of the
>>>>> app"
>>>>>
>>>>> will you be able to give some example use case here as to what you
>>>> mean by
>>>>> axially data provided from outside app?
>>>>>
>>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>
>>>>>> Both stores sever a different purpose.
>>>>>>
>>>>>> Regular stores allow you to store state the application computes.
>>>>>> Writing into the changelog is a fault-tolerance mechanism.
>>>>>>
>>>>>> Global store hold "axially" data that is provided from "outside" of
>>>> the
>>>>>> app. There is no changelog topic, but only the input topic (that is
>>>> used
>>>>>> to re-create the global state).
>>>>>>
>>>>>> Local stores are sharded and updates are "sync" as they don't need to
>>>> be
>>>>>> shared with anybody else.
>>>>>>
>>>>>> For global stores, as all instances need to be updated, updates are
>>>>>> async (we don't know when which instance will update it's own global
>>>>>> store replica).
>>>>>>
>>>>>>>> Say one stream thread updates the topic for global store and starts
>>>>>>>> processing next event wherein the processor tries to read the global
>>>>>> store
>>>>>>>> which may not have been synced with the topic?
>>>>>>
>>>>>> Correct. There is no guarantee when the update to the global store
>>>> will
>>>>>> be applied. As said, global stores are not designed to hold data the
>>>>>> application computes.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>>>>>>> thanks... will try with GlobalKTable.
>>>>>>> As a side question, I didn't really understand the significance of
>>>> global
>>>>>>> state store which kind of works in a reverse way to local state store
>>>>>> i.e.
>>>>>>> local state store is updated and then saved to changelog topic
>>>> whereas in
>>>>>>> case of global state store the topic is updated first and then
>>>> synced to
>>>>>>> global state store. Do these two work in sync i.e. the update to
>>>> topic
>>>>>> and
>>>>>>> global state store ?
>>>>>>>
>>>>>>> Say one stream thread updates the topic for global store and starts
>>>>>>> processing next event wherein the processor tries to read the global
>>>>>> store
>>>>>>> which may not have been synced with the topic?
>>>>>>>
>>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>>>
>>>>>>>> Yes.
>>>>>>>>
>>>>>>>> A `GlobalKTable` uses a global store internally.
>>>>>>>>
>>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
>>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>>>>>>>> Thanks Matthias.
>>>>>>>>> Can you elaborate on the replicated caching layer part?
>>>>>>>>> When you say global stores, do you mean GlobalKTable created from a
>>>>>> topic
>>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>>>>>>>>
>>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mjsax@apache.org
>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> It's not possible to modify state store from "outside".
>>>>>>>>>>
>>>>>>>>>> If you want to build a "replicated caching layer", you could use
>>>>>> global
>>>>>>>>>> stores and write into the corresponding topics to update all
>>>> stores.
>>>>>> Of
>>>>>>>>>> course, those updates would be async.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I am wondering if this is possible: i have been asked to use
>>>> state
>>>>>>>> stores
>>>>>>>>>>> as a general replicated cache among multiple instances of service
>>>>>>>>>> instances
>>>>>>>>>>> however the state store is created through streambuilder but is
>>>> not
>>>>>>>>>>> actually modified through stream processor topology however it
>>>> is to
>>>>>> be
>>>>>>>>>>> modified from outside the stream topology. So, essentially, the
>>>> state
>>>>>>>>>> store
>>>>>>>>>>> is just to be created from streambuilder and then to be used as
>>>> an
>>>>>>>>>>> application level cache that will get replicated between
>>>> application
>>>>>>>>>>> instances. Is this possible using state stores?
>>>>>>>>>>>
>>>>>>>>>>> Secondly, if possible, is this a good design approach?
>>>>>>>>>>>
>>>>>>>>>>> Appreciate your response since I don't know the internals of
>>>> state
>>>>>>>>>> stores.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
> 


Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,
I tried with default store as well but getting same error, can you please
check if I am initializing the global store in the right way:

public void setupGlobalCacheTables(String theKafkaServers) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    groupCacheTable =
    streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
        Materialized.as(GROUP_CACHE_STORE_NAME));
    Topology groupCacheTopology = streamsBuilder.build();
     kafkaStreams = new KafkaStreams(groupCacheTopology, props);
      kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping the stream");
kafkaStreams.close();
}));
}

On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pd...@gmail.com> wrote:

> Hi Matthias,
>
> By the way, I used the in-memory global store and the service is giving
> out of memory error during startup. Unfortunately i don't have a stack
> trace now but when i got stack the first time, the error was coming
> somewhere from memorypool.allocate or similar kind of method. If i get the
> stack trace again, I will share that with you.
> However, the topic from where the store is reading from is empty so I am
> not sure why the global k table is trying to occupy a lot of space. The POD
> memory request and limits are 500 MiB and 750 MiB respectively so the state
> store should fit into this memory I believe since topic is empty. Can you
> provide inputs on this.
>
>
> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pd...@gmail.com>
> wrote:
>
>> Ok... got it... is there any hook that I can attach to the global k table
>> or global store? What I mean here is I want to know when the global store
>> is updated with data from topic in that case the hook that I specified
>> should be invoked so i can do some activity like logging that, this will
>> allow me to know how long the global store took to sync up with topic after
>> the event has been put on the topic.
>>
>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>
>>> For example it could be some "static" information, like a mapping from
>>> zip code to city name.
>>>
>>> Something that does usually not change over time.
>>>
>>>
>>> -Matthias
>>>
>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>>> > Matthias,
>>> >
>>> > I am wondering what you mean by "Global store hold "axially" data that
>>> is
>>> > provided from "outside" of the
>>> > app"
>>> >
>>> > will you be able to give some example use case here as to what you
>>> mean by
>>> > axially data provided from outside app?
>>> >
>>> > On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>> >
>>> >> Both stores sever a different purpose.
>>> >>
>>> >> Regular stores allow you to store state the application computes.
>>> >> Writing into the changelog is a fault-tolerance mechanism.
>>> >>
>>> >> Global store hold "axially" data that is provided from "outside" of
>>> the
>>> >> app. There is no changelog topic, but only the input topic (that is
>>> used
>>> >> to re-create the global state).
>>> >>
>>> >> Local stores are sharded and updates are "sync" as they don't need to
>>> be
>>> >> shared with anybody else.
>>> >>
>>> >> For global stores, as all instances need to be updated, updates are
>>> >> async (we don't know when which instance will update it's own global
>>> >> store replica).
>>> >>
>>> >>>> Say one stream thread updates the topic for global store and starts
>>> >>>> processing next event wherein the processor tries to read the global
>>> >> store
>>> >>>> which may not have been synced with the topic?
>>> >>
>>> >> Correct. There is no guarantee when the update to the global store
>>> will
>>> >> be applied. As said, global stores are not designed to hold data the
>>> >> application computes.
>>> >>
>>> >>
>>> >> -Matthias
>>> >>
>>> >>
>>> >> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>>> >>> thanks... will try with GlobalKTable.
>>> >>> As a side question, I didn't really understand the significance of
>>> global
>>> >>> state store which kind of works in a reverse way to local state store
>>> >> i.e.
>>> >>> local state store is updated and then saved to changelog topic
>>> whereas in
>>> >>> case of global state store the topic is updated first and then
>>> synced to
>>> >>> global state store. Do these two work in sync i.e. the update to
>>> topic
>>> >> and
>>> >>> global state store ?
>>> >>>
>>> >>> Say one stream thread updates the topic for global store and starts
>>> >>> processing next event wherein the processor tries to read the global
>>> >> store
>>> >>> which may not have been synced with the topic?
>>> >>>
>>> >>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>> >>>
>>> >>>> Yes.
>>> >>>>
>>> >>>> A `GlobalKTable` uses a global store internally.
>>> >>>>
>>> >>>> You can also use `StreamsBuilder.addGlobalStore()` or
>>> >>>> `Topology.addGlobalStore()` to add a global store "manually".
>>> >>>>
>>> >>>>
>>> >>>> -Matthias
>>> >>>>
>>> >>>>
>>> >>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>> >>>>> Thanks Matthias.
>>> >>>>> Can you elaborate on the replicated caching layer part?
>>> >>>>> When you say global stores, do you mean GlobalKTable created from a
>>> >> topic
>>> >>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>> >>>>>
>>> >>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mjsax@apache.org
>>> >
>>> >>>> wrote:
>>> >>>>>
>>> >>>>>> It's not possible to modify state store from "outside".
>>> >>>>>>
>>> >>>>>> If you want to build a "replicated caching layer", you could use
>>> >> global
>>> >>>>>> stores and write into the corresponding topics to update all
>>> stores.
>>> >> Of
>>> >>>>>> course, those updates would be async.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> -Matthias
>>> >>>>>>
>>> >>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>> >>>>>>> Hi All,
>>> >>>>>>>
>>> >>>>>>> I am wondering if this is possible: i have been asked to use
>>> state
>>> >>>> stores
>>> >>>>>>> as a general replicated cache among multiple instances of service
>>> >>>>>> instances
>>> >>>>>>> however the state store is created through streambuilder but is
>>> not
>>> >>>>>>> actually modified through stream processor topology however it
>>> is to
>>> >> be
>>> >>>>>>> modified from outside the stream topology. So, essentially, the
>>> state
>>> >>>>>> store
>>> >>>>>>> is just to be created from streambuilder and then to be used as
>>> an
>>> >>>>>>> application level cache that will get replicated between
>>> application
>>> >>>>>>> instances. Is this possible using state stores?
>>> >>>>>>>
>>> >>>>>>> Secondly, if possible, is this a good design approach?
>>> >>>>>>>
>>> >>>>>>> Appreciate your response since I don't know the internals of
>>> state
>>> >>>>>> stores.
>>> >>>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >>
>>> >
>>>
>>>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Hi Matthias,

By the way, I used the in-memory global store and the service is giving out
of memory error during startup. Unfortunately i don't have a stack trace
now but when i got stack the first time, the error was coming somewhere
from memorypool.allocate or similar kind of method. If i get the stack
trace again, I will share that with you.
However, the topic from where the store is reading from is empty so I am
not sure why the global k table is trying to occupy a lot of space. The POD
memory request and limits are 500 MiB and 750 MiB respectively so the state
store should fit into this memory I believe since topic is empty. Can you
provide inputs on this.


On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pd...@gmail.com> wrote:

> Ok... got it... is there any hook that I can attach to the global k table
> or global store? What I mean here is I want to know when the global store
> is updated with data from topic in that case the hook that I specified
> should be invoked so i can do some activity like logging that, this will
> allow me to know how long the global store took to sync up with topic after
> the event has been put on the topic.
>
> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> For example it could be some "static" information, like a mapping from
>> zip code to city name.
>>
>> Something that does usually not change over time.
>>
>>
>> -Matthias
>>
>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>> > Matthias,
>> >
>> > I am wondering what you mean by "Global store hold "axially" data that
>> is
>> > provided from "outside" of the
>> > app"
>> >
>> > will you be able to give some example use case here as to what you mean
>> by
>> > axially data provided from outside app?
>> >
>> > On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >> Both stores sever a different purpose.
>> >>
>> >> Regular stores allow you to store state the application computes.
>> >> Writing into the changelog is a fault-tolerance mechanism.
>> >>
>> >> Global store hold "axially" data that is provided from "outside" of the
>> >> app. There is no changelog topic, but only the input topic (that is
>> used
>> >> to re-create the global state).
>> >>
>> >> Local stores are sharded and updates are "sync" as they don't need to
>> be
>> >> shared with anybody else.
>> >>
>> >> For global stores, as all instances need to be updated, updates are
>> >> async (we don't know when which instance will update it's own global
>> >> store replica).
>> >>
>> >>>> Say one stream thread updates the topic for global store and starts
>> >>>> processing next event wherein the processor tries to read the global
>> >> store
>> >>>> which may not have been synced with the topic?
>> >>
>> >> Correct. There is no guarantee when the update to the global store will
>> >> be applied. As said, global stores are not designed to hold data the
>> >> application computes.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>> >>> thanks... will try with GlobalKTable.
>> >>> As a side question, I didn't really understand the significance of
>> global
>> >>> state store which kind of works in a reverse way to local state store
>> >> i.e.
>> >>> local state store is updated and then saved to changelog topic
>> whereas in
>> >>> case of global state store the topic is updated first and then synced
>> to
>> >>> global state store. Do these two work in sync i.e. the update to topic
>> >> and
>> >>> global state store ?
>> >>>
>> >>> Say one stream thread updates the topic for global store and starts
>> >>> processing next event wherein the processor tries to read the global
>> >> store
>> >>> which may not have been synced with the topic?
>> >>>
>> >>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >>>
>> >>>> Yes.
>> >>>>
>> >>>> A `GlobalKTable` uses a global store internally.
>> >>>>
>> >>>> You can also use `StreamsBuilder.addGlobalStore()` or
>> >>>> `Topology.addGlobalStore()` to add a global store "manually".
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>>
>> >>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>> >>>>> Thanks Matthias.
>> >>>>> Can you elaborate on the replicated caching layer part?
>> >>>>> When you say global stores, do you mean GlobalKTable created from a
>> >> topic
>> >>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>> >>>>>
>> >>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
>> >>>> wrote:
>> >>>>>
>> >>>>>> It's not possible to modify state store from "outside".
>> >>>>>>
>> >>>>>> If you want to build a "replicated caching layer", you could use
>> >> global
>> >>>>>> stores and write into the corresponding topics to update all
>> stores.
>> >> Of
>> >>>>>> course, those updates would be async.
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>> >>>>>>> Hi All,
>> >>>>>>>
>> >>>>>>> I am wondering if this is possible: i have been asked to use state
>> >>>> stores
>> >>>>>>> as a general replicated cache among multiple instances of service
>> >>>>>> instances
>> >>>>>>> however the state store is created through streambuilder but is
>> not
>> >>>>>>> actually modified through stream processor topology however it is
>> to
>> >> be
>> >>>>>>> modified from outside the stream topology. So, essentially, the
>> state
>> >>>>>> store
>> >>>>>>> is just to be created from streambuilder and then to be used as an
>> >>>>>>> application level cache that will get replicated between
>> application
>> >>>>>>> instances. Is this possible using state stores?
>> >>>>>>>
>> >>>>>>> Secondly, if possible, is this a good design approach?
>> >>>>>>>
>> >>>>>>> Appreciate your response since I don't know the internals of state
>> >>>>>> stores.
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Ok... got it... is there any hook that I can attach to the global k table
or global store? What I mean here is I want to know when the global store
is updated with data from topic in that case the hook that I specified
should be invoked so i can do some activity like logging that, this will
allow me to know how long the global store took to sync up with topic after
the event has been put on the topic.

On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org> wrote:

> For example it could be some "static" information, like a mapping from
> zip code to city name.
>
> Something that does usually not change over time.
>
>
> -Matthias
>
> On 5/25/20 9:55 PM, Pushkar Deole wrote:
> > Matthias,
> >
> > I am wondering what you mean by "Global store hold "axially" data that is
> > provided from "outside" of the
> > app"
> >
> > will you be able to give some example use case here as to what you mean
> by
> > axially data provided from outside app?
> >
> > On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> Both stores sever a different purpose.
> >>
> >> Regular stores allow you to store state the application computes.
> >> Writing into the changelog is a fault-tolerance mechanism.
> >>
> >> Global store hold "axially" data that is provided from "outside" of the
> >> app. There is no changelog topic, but only the input topic (that is used
> >> to re-create the global state).
> >>
> >> Local stores are sharded and updates are "sync" as they don't need to be
> >> shared with anybody else.
> >>
> >> For global stores, as all instances need to be updated, updates are
> >> async (we don't know when which instance will update it's own global
> >> store replica).
> >>
> >>>> Say one stream thread updates the topic for global store and starts
> >>>> processing next event wherein the processor tries to read the global
> >> store
> >>>> which may not have been synced with the topic?
> >>
> >> Correct. There is no guarantee when the update to the global store will
> >> be applied. As said, global stores are not designed to hold data the
> >> application computes.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/30/20 11:11 PM, Pushkar Deole wrote:
> >>> thanks... will try with GlobalKTable.
> >>> As a side question, I didn't really understand the significance of
> global
> >>> state store which kind of works in a reverse way to local state store
> >> i.e.
> >>> local state store is updated and then saved to changelog topic whereas
> in
> >>> case of global state store the topic is updated first and then synced
> to
> >>> global state store. Do these two work in sync i.e. the update to topic
> >> and
> >>> global state store ?
> >>>
> >>> Say one stream thread updates the topic for global store and starts
> >>> processing next event wherein the processor tries to read the global
> >> store
> >>> which may not have been synced with the topic?
> >>>
> >>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>
> >>>> Yes.
> >>>>
> >>>> A `GlobalKTable` uses a global store internally.
> >>>>
> >>>> You can also use `StreamsBuilder.addGlobalStore()` or
> >>>> `Topology.addGlobalStore()` to add a global store "manually".
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >>>>> Thanks Matthias.
> >>>>> Can you elaborate on the replicated caching layer part?
> >>>>> When you say global stores, do you mean GlobalKTable created from a
> >> topic
> >>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >>>>>
> >>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> It's not possible to modify state store from "outside".
> >>>>>>
> >>>>>> If you want to build a "replicated caching layer", you could use
> >> global
> >>>>>> stores and write into the corresponding topics to update all stores.
> >> Of
> >>>>>> course, those updates would be async.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> I am wondering if this is possible: i have been asked to use state
> >>>> stores
> >>>>>>> as a general replicated cache among multiple instances of service
> >>>>>> instances
> >>>>>>> however the state store is created through streambuilder but is not
> >>>>>>> actually modified through stream processor topology however it is
> to
> >> be
> >>>>>>> modified from outside the stream topology. So, essentially, the
> state
> >>>>>> store
> >>>>>>> is just to be created from streambuilder and then to be used as an
> >>>>>>> application level cache that will get replicated between
> application
> >>>>>>> instances. Is this possible using state stores?
> >>>>>>>
> >>>>>>> Secondly, if possible, is this a good design approach?
> >>>>>>>
> >>>>>>> Appreciate your response since I don't know the internals of state
> >>>>>> stores.
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by "Matthias J. Sax" <mj...@apache.org>.
For example it could be some "static" information, like a mapping from
zip code to city name.

Something that does usually not change over time.


-Matthias

On 5/25/20 9:55 PM, Pushkar Deole wrote:
> Matthias,
> 
> I am wondering what you mean by "Global store hold "axially" data that is
> provided from "outside" of the
> app"
> 
> will you be able to give some example use case here as to what you mean by
> axially data provided from outside app?
> 
> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Both stores sever a different purpose.
>>
>> Regular stores allow you to store state the application computes.
>> Writing into the changelog is a fault-tolerance mechanism.
>>
>> Global store hold "axially" data that is provided from "outside" of the
>> app. There is no changelog topic, but only the input topic (that is used
>> to re-create the global state).
>>
>> Local stores are sharded and updates are "sync" as they don't need to be
>> shared with anybody else.
>>
>> For global stores, as all instances need to be updated, updates are
>> async (we don't know when which instance will update it's own global
>> store replica).
>>
>>>> Say one stream thread updates the topic for global store and starts
>>>> processing next event wherein the processor tries to read the global
>> store
>>>> which may not have been synced with the topic?
>>
>> Correct. There is no guarantee when the update to the global store will
>> be applied. As said, global stores are not designed to hold data the
>> application computes.
>>
>>
>> -Matthias
>>
>>
>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>>> thanks... will try with GlobalKTable.
>>> As a side question, I didn't really understand the significance of global
>>> state store which kind of works in a reverse way to local state store
>> i.e.
>>> local state store is updated and then saved to changelog topic whereas in
>>> case of global state store the topic is updated first and then synced to
>>> global state store. Do these two work in sync i.e. the update to topic
>> and
>>> global state store ?
>>>
>>> Say one stream thread updates the topic for global store and starts
>>> processing next event wherein the processor tries to read the global
>> store
>>> which may not have been synced with the topic?
>>>
>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>>> Yes.
>>>>
>>>> A `GlobalKTable` uses a global store internally.
>>>>
>>>> You can also use `StreamsBuilder.addGlobalStore()` or
>>>> `Topology.addGlobalStore()` to add a global store "manually".
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>>>> Thanks Matthias.
>>>>> Can you elaborate on the replicated caching layer part?
>>>>> When you say global stores, do you mean GlobalKTable created from a
>> topic
>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>>>>
>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
>>>> wrote:
>>>>>
>>>>>> It's not possible to modify state store from "outside".
>>>>>>
>>>>>> If you want to build a "replicated caching layer", you could use
>> global
>>>>>> stores and write into the corresponding topics to update all stores.
>> Of
>>>>>> course, those updates would be async.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I am wondering if this is possible: i have been asked to use state
>>>> stores
>>>>>>> as a general replicated cache among multiple instances of service
>>>>>> instances
>>>>>>> however the state store is created through streambuilder but is not
>>>>>>> actually modified through stream processor topology however it is to
>> be
>>>>>>> modified from outside the stream topology. So, essentially, the state
>>>>>> store
>>>>>>> is just to be created from streambuilder and then to be used as an
>>>>>>> application level cache that will get replicated between application
>>>>>>> instances. Is this possible using state stores?
>>>>>>>
>>>>>>> Secondly, if possible, is this a good design approach?
>>>>>>>
>>>>>>> Appreciate your response since I don't know the internals of state
>>>>>> stores.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

I am wondering what you mean by "Global store hold "axially" data that is
provided from "outside" of the
app"

will you be able to give some example use case here as to what you mean by
axially data provided from outside app?

On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org> wrote:

> Both stores sever a different purpose.
>
> Regular stores allow you to store state the application computes.
> Writing into the changelog is a fault-tolerance mechanism.
>
> Global store hold "axially" data that is provided from "outside" of the
> app. There is no changelog topic, but only the input topic (that is used
> to re-create the global state).
>
> Local stores are sharded and updates are "sync" as they don't need to be
> shared with anybody else.
>
> For global stores, as all instances need to be updated, updates are
> async (we don't know when which instance will update it's own global
> store replica).
>
> >> Say one stream thread updates the topic for global store and starts
> >> processing next event wherein the processor tries to read the global
> store
> >> which may not have been synced with the topic?
>
> Correct. There is no guarantee when the update to the global store will
> be applied. As said, global stores are not designed to hold data the
> application computes.
>
>
> -Matthias
>
>
> On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > thanks... will try with GlobalKTable.
> > As a side question, I didn't really understand the significance of global
> > state store which kind of works in a reverse way to local state store
> i.e.
> > local state store is updated and then saved to changelog topic whereas in
> > case of global state store the topic is updated first and then synced to
> > global state store. Do these two work in sync i.e. the update to topic
> and
> > global state store ?
> >
> > Say one stream thread updates the topic for global store and starts
> > processing next event wherein the processor tries to read the global
> store
> > which may not have been synced with the topic?
> >
> > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> Yes.
> >>
> >> A `GlobalKTable` uses a global store internally.
> >>
> >> You can also use `StreamsBuilder.addGlobalStore()` or
> >> `Topology.addGlobalStore()` to add a global store "manually".
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >>> Thanks Matthias.
> >>> Can you elaborate on the replicated caching layer part?
> >>> When you say global stores, do you mean GlobalKTable created from a
> topic
> >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >>>
> >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>
> >>>> It's not possible to modify state store from "outside".
> >>>>
> >>>> If you want to build a "replicated caching layer", you could use
> global
> >>>> stores and write into the corresponding topics to update all stores.
> Of
> >>>> course, those updates would be async.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >>>>> Hi All,
> >>>>>
> >>>>> I am wondering if this is possible: i have been asked to use state
> >> stores
> >>>>> as a general replicated cache among multiple instances of service
> >>>> instances
> >>>>> however the state store is created through streambuilder but is not
> >>>>> actually modified through stream processor topology however it is to
> be
> >>>>> modified from outside the stream topology. So, essentially, the state
> >>>> store
> >>>>> is just to be created from streambuilder and then to be used as an
> >>>>> application level cache that will get replicated between application
> >>>>> instances. Is this possible using state stores?
> >>>>>
> >>>>> Secondly, if possible, is this a good design approach?
> >>>>>
> >>>>> Appreciate your response since I don't know the internals of state
> >>>> stores.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Hello John, Matthias

Sorry for bothering you, however this is now getting crazier. Initially I
was under the impression that the cache being hold by application is in the
form of key/value where key is the instance of agentId (e.g. 10) and value
will hold other attributes (and their respective values) for that agent
e.g. firstName='Dave', Lastname='Richards' etc.
Now, what I understand is the cache is holding data structures with the key
as fieldName like agentFirstName and it is mapped to value that contains an
arraylist of all firstNames. Then there is another key agentLastName and
that stores all the last names in another arraylist. And in order to tie
the related fields together there is another structures that is holding
agentIds again in form lf arraylist and the index of this arraylist is used
to tie all the field together. e.g. in the arraylist there is agentId 10 at
say index 2 then the application goes into all above mentioned cached
arraylists to fetch all the fields at index 2 in respective arraylist  and
returns the data accordingly.

If the key would have been agentID and value would have been corresponding
fields then it would have been easier to write entire structure into a
topic. Now that the key is field name like mentioned above so storing the
structure in topic is impossible because when multiple instances would
operate on that topic with same key then the updates are going to collide.
Since GlobalKTable in each instance would not be in sync, this could result
in lost updates and other sorts of issues.

Currently this design works because each instance is reading all the
records from input topic like each instance reads all events for all agents
one by one and populates the cache in each instance in memory, so there are
no synchronization issues. But the moment we want to persist this cache
structure to topic I am sure it will fail.

There is an option of complete rewrite of cache to have instance of agentId
as key and values as corresponding fieldName and their respective values
but that would involve a significant effort and testing.
I don't know if you see any workaround to use these structures as they are
and persist them to topic without any sync issues when it gets scaled to
multiple application instances.

Thanks in advance for all the help.


On Thu, May 7, 2020 at 7:27 PM John Roesler <vv...@apache.org> wrote:

> Hi Pushkar,
>
> To answer your question about tuning the global store latency, I think the
> biggest impact thing you can do is to configure the consumer that loads the
> data for global stores. You can pass configs specifically to the global
> consumer with the prefix: “ global.consumer.”
>
> Regarding the larger situation, it seems like the global table and a
> distributed cache would display the same basic behavior in terms of the
> potential for missed joins. Then, it probably makes sense to go for the
> option with fewer components to implement and maintain, which to me points
> to the global KTable.
>
> Since you can anticipate that missed joins can be a problem, you can build
> in some metrics and reporting for how many misses you actually observe, and
> potentially redesign the app if it’s actually a problem.
>
> I hope this helps!
> -John
>
> On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> > Thanks John... appreciate your inputs and suggestions. I have been
> assigned
> > recently to this task (of persisting the cache) and haven't been involved
> > in original design and architecture and agree with all the issues you
> have
> > highlighted.
> > However, at this point, i don't think the application can be converted to
> > streams since the design is not flexible and it would require lot of
> > rewrite of code plus subsequent testing.
> >
> > My first thought was to use external database only,  preferably the
> > distributed caching systems like Apache Ignite since it will have least
> > impact on performance. Going to database for every event would impact the
> > throughput a lot. Probably having distributed caching (key/value pairs)
> > would have comparatively lesser impact.
> > Second choice is to go for GlobalKTable however this needs to be done
> very
> > carefully.
> >
> > Thanks again!
> >
> > On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pd...@gmail.com>
> wrote:
> >
> > > Thanks John... what parameters would affect the latency in case
> > > GlobalKTable will be used and is there any configurations that could be
> > > tuned to minimize the latency of sync with input topic?
> > >
> > > On Mon, May 4, 2020 at 10:20 PM John Roesler <vv...@apache.org>
> wrote:
> > >
> > >> Hello Pushkar,
> > >>
> > >> Yes, that’s correct. The operation you describe is currently not
> > >> supported. If you want to keep the structure you described in place,
> I’d
> > >> suggest using an external database for the admin objects. I’ll give
> another
> > >> idea below.
> > >>
> > >> With your current architecture, I’m a little concerned about data
> races.
> > >> From what I saw, nothing would prevent processing stream records with
> agent
> > >> 10 before you process the admin record with agent 10. This problem
> will
> > >> persist no matter where you locate the cache.
> > >>
> > >> GlobalKTable would no doubt make it worse, since it increases the
> latency
> > >> before admin record 10 is queriable everywhere.
> > >>
> > >> I think you’ll want to make a call between architecture simplicity
> > >> (remote cache or global KTable) vs the probability of missed joins.
> > >>
> > >> I think the “best” way to solve this problem (that comes to mind
> anyway)
> > >> might be to
> > >> 1. Repartition the stream to be co-partitioned with the admin records.
> > >> 2. Do a local (not global) stream-table join
> > >> 3. Enable task idling
> > >>
> > >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make
> the
> > >> agent Id the new key of the stream, and then use ‘through’, (where the
> > >> intermediate topic has the same number of partitions as the admin
> topic) to
> > >> do the repartitioning. In 2.6, there is a “repartition” operator that
> will
> > >> make this easier.
> > >>
> > >> The repartition ensures that all stream records with agent id 10 will
> be
> > >> processed by the same thread that processes the admin records with
> agent id
> > >> 10, hence it will be able to find agent 10 in the local KTable store.
> > >>
> > >> Task idling will minimize your chances of missing any enrichments.
> When a
> > >> task has two inputs (E.g., your repartitioned stream joining with the
> admin
> > >> table), it makes Streams wait until both inputs are buffered before
> > >> processing, so it can do a better job of processing in timestamp
> order.
> > >>
> > >> I hope this helps!
> > >> -John
> > >>
> > >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> > >> > If i understand correctly, Kafka is not designed to provide
> replicated
> > >> > caching mechanism wherein the updates to cache will be synchronous
> > >> across
> > >> > multiple cache instances.
> > >> >
> > >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pdeole2015@gmail.com
> >
> > >> wrote:
> > >> >
> > >> > > Thanks John.
> > >> > >
> > >> > > Actually, this is a normal consumer-producer application wherein
> > >> there are
> > >> > > 2 consumers (admin consumer and main consumer) consuming messages
> > >> from 2
> > >> > > different topics.
> > >> > > One of the consumers consumes messages from a admin topic and
> > >> populates
> > >> > > data in a cache e.g. lets say agent with agent id 10 for which the
> > >> first
> > >> > > name and last name is received is populated in cache. When the
> other
> > >> > > consumer consumes message and it has agent id 10 then it reads the
> > >> cache,
> > >> > > appends the first name and last name and then sends enriched
> event to
> > >> > > producer.
> > >> > > In this case, each application instance consumes all the events
> from
> > >> admin
> > >> > > topic (unique consumer id) and keeps them in the cache in memory.
> > >> > > Now the requirement is to persist the cache and make is shared
> > >> between the
> > >> > > application instances, so each instance would consume partitions
> of
> > >> admin
> > >> > > topic and write to admin cache.
> > >> > >
> > >> > > If we want to use kafka streams, the application is so much
> evolved
> > >> that
> > >> > > it is difficult to migrate to streams at this stage. Secondly,
> from
> > >> past
> > >> > > mail chains, streams also won't serve the requirement since local
> > >> state
> > >> > > stores would just hold the local state of admin data and the cache
> > >> written
> > >> > > by each instance won't be shared with other instances.
> > >> > >
> > >> > > Global state stores may help but again it requires writing to the
> > >> topic
> > >> > > which is then synced with the state stores in the instances and
> the
> > >> > > instances may not be in sync with each.
> > >> > > I am not sure if this would cause any inconsistencies since i
> don't
> > >> know
> > >> > > how the events would flow from source e.g. if admin data is
> consumed
> > >> by one
> > >> > > instance which then modified the topic but it is not yet synced to
> > >> all the
> > >> > > global state stores and the next event arrived on the main
> consumer
> > >> on a
> > >> > > different instance and it tried to read from store cache then it
> > >> doesn't
> > >> > > get the data, so the event passed on without enriched data.
> > >> > > That's pretty much about the use case.
> > >> > >
> > >> > >
> > >> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org>
> > >> wrote:
> > >> > >
> > >> > >> Hi Pushkar,
> > >> > >>
> > >> > >> I’ve been wondering if we should add writable tables to the
> Streams
> > >> api.
> > >> > >> Can you explain more about your use case and how it would
> integrate
> > >> with
> > >> > >> your application?
> > >> > >>
> > >> > >> Incidentally, this would also help us provide more concrete
> advice.
> > >> > >>
> > >> > >> Thanks!
> > >> > >> John
> > >> > >>
> > >> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> > >> > >> > Both stores sever a different purpose.
> > >> > >> >
> > >> > >> > Regular stores allow you to store state the application
> computes.
> > >> > >> > Writing into the changelog is a fault-tolerance mechanism.
> > >> > >> >
> > >> > >> > Global store hold "axially" data that is provided from
> "outside"
> > >> of the
> > >> > >> > app. There is no changelog topic, but only the input topic
> (that
> > >> is used
> > >> > >> > to re-create the global state).
> > >> > >> >
> > >> > >> > Local stores are sharded and updates are "sync" as they don't
> need
> > >> to be
> > >> > >> > shared with anybody else.
> > >> > >> >
> > >> > >> > For global stores, as all instances need to be updated,
> updates are
> > >> > >> > async (we don't know when which instance will update it's own
> > >> global
> > >> > >> > store replica).
> > >> > >> >
> > >> > >> > >> Say one stream thread updates the topic for global store and
> > >> starts
> > >> > >> > >> processing next event wherein the processor tries to read
> the
> > >> global
> > >> > >> store
> > >> > >> > >> which may not have been synced with the topic?
> > >> > >> >
> > >> > >> > Correct. There is no guarantee when the update to the global
> store
> > >> will
> > >> > >> > be applied. As said, global stores are not designed to hold
> data
> > >> the
> > >> > >> > application computes.
> > >> > >> >
> > >> > >> >
> > >> > >> > -Matthias
> > >> > >> >
> > >> > >> >
> > >> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > >> > >> > > thanks... will try with GlobalKTable.
> > >> > >> > > As a side question, I didn't really understand the
> significance
> > >> of
> > >> > >> global
> > >> > >> > > state store which kind of works in a reverse way to local
> state
> > >> store
> > >> > >> i.e.
> > >> > >> > > local state store is updated and then saved to changelog
> topic
> > >> > >> whereas in
> > >> > >> > > case of global state store the topic is updated first and
> then
> > >> synced
> > >> > >> to
> > >> > >> > > global state store. Do these two work in sync i.e. the
> update to
> > >> > >> topic and
> > >> > >> > > global state store ?
> > >> > >> > >
> > >> > >> > > Say one stream thread updates the topic for global store and
> > >> starts
> > >> > >> > > processing next event wherein the processor tries to read the
> > >> global
> > >> > >> store
> > >> > >> > > which may not have been synced with the topic?
> > >> > >> > >
> > >> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <
> mjsax@apache.org
> > >> >
> > >> > >> wrote:
> > >> > >> > >
> > >> > >> > >> Yes.
> > >> > >> > >>
> > >> > >> > >> A `GlobalKTable` uses a global store internally.
> > >> > >> > >>
> > >> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> > >> > >> > >> `Topology.addGlobalStore()` to add a global store
> "manually".
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >> -Matthias
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > >> > >> > >>> Thanks Matthias.
> > >> > >> > >>> Can you elaborate on the replicated caching layer part?
> > >> > >> > >>> When you say global stores, do you mean GlobalKTable
> created
> > >> from a
> > >> > >> topic
> > >> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method
> ?
> > >> > >> > >>>
> > >> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> > >> mjsax@apache.org>
> > >> > >> > >> wrote:
> > >> > >> > >>>
> > >> > >> > >>>> It's not possible to modify state store from "outside".
> > >> > >> > >>>>
> > >> > >> > >>>> If you want to build a "replicated caching layer", you
> could
> > >> use
> > >> > >> global
> > >> > >> > >>>> stores and write into the corresponding topics to update
> all
> > >> > >> stores. Of
> > >> > >> > >>>> course, those updates would be async.
> > >> > >> > >>>>
> > >> > >> > >>>>
> > >> > >> > >>>> -Matthias
> > >> > >> > >>>>
> > >> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > >> > >> > >>>>> Hi All,
> > >> > >> > >>>>>
> > >> > >> > >>>>> I am wondering if this is possible: i have been asked to
> use
> > >> state
> > >> > >> > >> stores
> > >> > >> > >>>>> as a general replicated cache among multiple instances of
> > >> service
> > >> > >> > >>>> instances
> > >> > >> > >>>>> however the state store is created through streambuilder
> but
> > >> is
> > >> > >> not
> > >> > >> > >>>>> actually modified through stream processor topology
> however
> > >> it is
> > >> > >> to be
> > >> > >> > >>>>> modified from outside the stream topology. So,
> essentially,
> > >> the
> > >> > >> state
> > >> > >> > >>>> store
> > >> > >> > >>>>> is just to be created from streambuilder and then to be
> used
> > >> as an
> > >> > >> > >>>>> application level cache that will get replicated between
> > >> > >> application
> > >> > >> > >>>>> instances. Is this possible using state stores?
> > >> > >> > >>>>>
> > >> > >> > >>>>> Secondly, if possible, is this a good design approach?
> > >> > >> > >>>>>
> > >> > >> > >>>>> Appreciate your response since I don't know the
> internals of
> > >> state
> > >> > >> > >>>> stores.
> > >> > >> > >>>>>
> > >> > >> > >>>>
> > >> > >> > >>>>
> > >> > >> > >>>
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >
> > >> > >> >
> > >> > >> >
> > >> > >> > Attachments:
> > >> > >> > * signature.asc
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by John Roesler <vv...@apache.org>.
Hi Pushkar,

I’m glad you’ve been able to work through the issues.

The GlobalKTable does store the data in memory (or on disk, depending how you configure it). I think the in-memory version uses a TreeMap, which is logarithmic time access. I think you’ll find it sufficiently fast regardless. 

Thanks,
John

On Mon, May 11, 2020, at 06:51, Pushkar Deole wrote:
> John,
> 
> I think I can get the cache structure modified to make use of GlobalKTable
> here so the data can be shared across. I could get information that the
> admin data will be uploaded well in advance before main events so the issue
> with 'missed joins' won't exists since by the time main events start
> flowing, the admin data has been synchronized to all the service instances.
> 
> By the way, I would like to ask you how GlobalKTable works internally, does
> it store all data in memory or it gets it from the backed topic everytime?
> Secondly, what kind of internal data structure does it use? Is it good for
> constant time performance?
> 
> On Thu, May 7, 2020 at 7:27 PM John Roesler <vv...@apache.org> wrote:
> 
> > Hi Pushkar,
> >
> > To answer your question about tuning the global store latency, I think the
> > biggest impact thing you can do is to configure the consumer that loads the
> > data for global stores. You can pass configs specifically to the global
> > consumer with the prefix: “ global.consumer.”
> >
> > Regarding the larger situation, it seems like the global table and a
> > distributed cache would display the same basic behavior in terms of the
> > potential for missed joins. Then, it probably makes sense to go for the
> > option with fewer components to implement and maintain, which to me points
> > to the global KTable.
> >
> > Since you can anticipate that missed joins can be a problem, you can build
> > in some metrics and reporting for how many misses you actually observe, and
> > potentially redesign the app if it’s actually a problem.
> >
> > I hope this helps!
> > -John
> >
> > On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> > > Thanks John... appreciate your inputs and suggestions. I have been
> > assigned
> > > recently to this task (of persisting the cache) and haven't been involved
> > > in original design and architecture and agree with all the issues you
> > have
> > > highlighted.
> > > However, at this point, i don't think the application can be converted to
> > > streams since the design is not flexible and it would require lot of
> > > rewrite of code plus subsequent testing.
> > >
> > > My first thought was to use external database only,  preferably the
> > > distributed caching systems like Apache Ignite since it will have least
> > > impact on performance. Going to database for every event would impact the
> > > throughput a lot. Probably having distributed caching (key/value pairs)
> > > would have comparatively lesser impact.
> > > Second choice is to go for GlobalKTable however this needs to be done
> > very
> > > carefully.
> > >
> > > Thanks again!
> > >
> > > On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pd...@gmail.com>
> > wrote:
> > >
> > > > Thanks John... what parameters would affect the latency in case
> > > > GlobalKTable will be used and is there any configurations that could be
> > > > tuned to minimize the latency of sync with input topic?
> > > >
> > > > On Mon, May 4, 2020 at 10:20 PM John Roesler <vv...@apache.org>
> > wrote:
> > > >
> > > >> Hello Pushkar,
> > > >>
> > > >> Yes, that’s correct. The operation you describe is currently not
> > > >> supported. If you want to keep the structure you described in place,
> > I’d
> > > >> suggest using an external database for the admin objects. I’ll give
> > another
> > > >> idea below.
> > > >>
> > > >> With your current architecture, I’m a little concerned about data
> > races.
> > > >> From what I saw, nothing would prevent processing stream records with
> > agent
> > > >> 10 before you process the admin record with agent 10. This problem
> > will
> > > >> persist no matter where you locate the cache.
> > > >>
> > > >> GlobalKTable would no doubt make it worse, since it increases the
> > latency
> > > >> before admin record 10 is queriable everywhere.
> > > >>
> > > >> I think you’ll want to make a call between architecture simplicity
> > > >> (remote cache or global KTable) vs the probability of missed joins.
> > > >>
> > > >> I think the “best” way to solve this problem (that comes to mind
> > anyway)
> > > >> might be to
> > > >> 1. Repartition the stream to be co-partitioned with the admin records.
> > > >> 2. Do a local (not global) stream-table join
> > > >> 3. Enable task idling
> > > >>
> > > >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make
> > the
> > > >> agent Id the new key of the stream, and then use ‘through’, (where the
> > > >> intermediate topic has the same number of partitions as the admin
> > topic) to
> > > >> do the repartitioning. In 2.6, there is a “repartition” operator that
> > will
> > > >> make this easier.
> > > >>
> > > >> The repartition ensures that all stream records with agent id 10 will
> > be
> > > >> processed by the same thread that processes the admin records with
> > agent id
> > > >> 10, hence it will be able to find agent 10 in the local KTable store.
> > > >>
> > > >> Task idling will minimize your chances of missing any enrichments.
> > When a
> > > >> task has two inputs (E.g., your repartitioned stream joining with the
> > admin
> > > >> table), it makes Streams wait until both inputs are buffered before
> > > >> processing, so it can do a better job of processing in timestamp
> > order.
> > > >>
> > > >> I hope this helps!
> > > >> -John
> > > >>
> > > >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> > > >> > If i understand correctly, Kafka is not designed to provide
> > replicated
> > > >> > caching mechanism wherein the updates to cache will be synchronous
> > > >> across
> > > >> > multiple cache instances.
> > > >> >
> > > >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pdeole2015@gmail.com
> > >
> > > >> wrote:
> > > >> >
> > > >> > > Thanks John.
> > > >> > >
> > > >> > > Actually, this is a normal consumer-producer application wherein
> > > >> there are
> > > >> > > 2 consumers (admin consumer and main consumer) consuming messages
> > > >> from 2
> > > >> > > different topics.
> > > >> > > One of the consumers consumes messages from a admin topic and
> > > >> populates
> > > >> > > data in a cache e.g. lets say agent with agent id 10 for which the
> > > >> first
> > > >> > > name and last name is received is populated in cache. When the
> > other
> > > >> > > consumer consumes message and it has agent id 10 then it reads the
> > > >> cache,
> > > >> > > appends the first name and last name and then sends enriched
> > event to
> > > >> > > producer.
> > > >> > > In this case, each application instance consumes all the events
> > from
> > > >> admin
> > > >> > > topic (unique consumer id) and keeps them in the cache in memory.
> > > >> > > Now the requirement is to persist the cache and make is shared
> > > >> between the
> > > >> > > application instances, so each instance would consume partitions
> > of
> > > >> admin
> > > >> > > topic and write to admin cache.
> > > >> > >
> > > >> > > If we want to use kafka streams, the application is so much
> > evolved
> > > >> that
> > > >> > > it is difficult to migrate to streams at this stage. Secondly,
> > from
> > > >> past
> > > >> > > mail chains, streams also won't serve the requirement since local
> > > >> state
> > > >> > > stores would just hold the local state of admin data and the cache
> > > >> written
> > > >> > > by each instance won't be shared with other instances.
> > > >> > >
> > > >> > > Global state stores may help but again it requires writing to the
> > > >> topic
> > > >> > > which is then synced with the state stores in the instances and
> > the
> > > >> > > instances may not be in sync with each.
> > > >> > > I am not sure if this would cause any inconsistencies since i
> > don't
> > > >> know
> > > >> > > how the events would flow from source e.g. if admin data is
> > consumed
> > > >> by one
> > > >> > > instance which then modified the topic but it is not yet synced to
> > > >> all the
> > > >> > > global state stores and the next event arrived on the main
> > consumer
> > > >> on a
> > > >> > > different instance and it tried to read from store cache then it
> > > >> doesn't
> > > >> > > get the data, so the event passed on without enriched data.
> > > >> > > That's pretty much about the use case.
> > > >> > >
> > > >> > >
> > > >> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org>
> > > >> wrote:
> > > >> > >
> > > >> > >> Hi Pushkar,
> > > >> > >>
> > > >> > >> I’ve been wondering if we should add writable tables to the
> > Streams
> > > >> api.
> > > >> > >> Can you explain more about your use case and how it would
> > integrate
> > > >> with
> > > >> > >> your application?
> > > >> > >>
> > > >> > >> Incidentally, this would also help us provide more concrete
> > advice.
> > > >> > >>
> > > >> > >> Thanks!
> > > >> > >> John
> > > >> > >>
> > > >> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> > > >> > >> > Both stores sever a different purpose.
> > > >> > >> >
> > > >> > >> > Regular stores allow you to store state the application
> > computes.
> > > >> > >> > Writing into the changelog is a fault-tolerance mechanism.
> > > >> > >> >
> > > >> > >> > Global store hold "axially" data that is provided from
> > "outside"
> > > >> of the
> > > >> > >> > app. There is no changelog topic, but only the input topic
> > (that
> > > >> is used
> > > >> > >> > to re-create the global state).
> > > >> > >> >
> > > >> > >> > Local stores are sharded and updates are "sync" as they don't
> > need
> > > >> to be
> > > >> > >> > shared with anybody else.
> > > >> > >> >
> > > >> > >> > For global stores, as all instances need to be updated,
> > updates are
> > > >> > >> > async (we don't know when which instance will update it's own
> > > >> global
> > > >> > >> > store replica).
> > > >> > >> >
> > > >> > >> > >> Say one stream thread updates the topic for global store and
> > > >> starts
> > > >> > >> > >> processing next event wherein the processor tries to read
> > the
> > > >> global
> > > >> > >> store
> > > >> > >> > >> which may not have been synced with the topic?
> > > >> > >> >
> > > >> > >> > Correct. There is no guarantee when the update to the global
> > store
> > > >> will
> > > >> > >> > be applied. As said, global stores are not designed to hold
> > data
> > > >> the
> > > >> > >> > application computes.
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > -Matthias
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > > >> > >> > > thanks... will try with GlobalKTable.
> > > >> > >> > > As a side question, I didn't really understand the
> > significance
> > > >> of
> > > >> > >> global
> > > >> > >> > > state store which kind of works in a reverse way to local
> > state
> > > >> store
> > > >> > >> i.e.
> > > >> > >> > > local state store is updated and then saved to changelog
> > topic
> > > >> > >> whereas in
> > > >> > >> > > case of global state store the topic is updated first and
> > then
> > > >> synced
> > > >> > >> to
> > > >> > >> > > global state store. Do these two work in sync i.e. the
> > update to
> > > >> > >> topic and
> > > >> > >> > > global state store ?
> > > >> > >> > >
> > > >> > >> > > Say one stream thread updates the topic for global store and
> > > >> starts
> > > >> > >> > > processing next event wherein the processor tries to read the
> > > >> global
> > > >> > >> store
> > > >> > >> > > which may not have been synced with the topic?
> > > >> > >> > >
> > > >> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <
> > mjsax@apache.org
> > > >> >
> > > >> > >> wrote:
> > > >> > >> > >
> > > >> > >> > >> Yes.
> > > >> > >> > >>
> > > >> > >> > >> A `GlobalKTable` uses a global store internally.
> > > >> > >> > >>
> > > >> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> > > >> > >> > >> `Topology.addGlobalStore()` to add a global store
> > "manually".
> > > >> > >> > >>
> > > >> > >> > >>
> > > >> > >> > >> -Matthias
> > > >> > >> > >>
> > > >> > >> > >>
> > > >> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > > >> > >> > >>> Thanks Matthias.
> > > >> > >> > >>> Can you elaborate on the replicated caching layer part?
> > > >> > >> > >>> When you say global stores, do you mean GlobalKTable
> > created
> > > >> from a
> > > >> > >> topic
> > > >> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method
> > ?
> > > >> > >> > >>>
> > > >> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> > > >> mjsax@apache.org>
> > > >> > >> > >> wrote:
> > > >> > >> > >>>
> > > >> > >> > >>>> It's not possible to modify state store from "outside".
> > > >> > >> > >>>>
> > > >> > >> > >>>> If you want to build a "replicated caching layer", you
> > could
> > > >> use
> > > >> > >> global
> > > >> > >> > >>>> stores and write into the corresponding topics to update
> > all
> > > >> > >> stores. Of
> > > >> > >> > >>>> course, those updates would be async.
> > > >> > >> > >>>>
> > > >> > >> > >>>>
> > > >> > >> > >>>> -Matthias
> > > >> > >> > >>>>
> > > >> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > > >> > >> > >>>>> Hi All,
> > > >> > >> > >>>>>
> > > >> > >> > >>>>> I am wondering if this is possible: i have been asked to
> > use
> > > >> state
> > > >> > >> > >> stores
> > > >> > >> > >>>>> as a general replicated cache among multiple instances of
> > > >> service
> > > >> > >> > >>>> instances
> > > >> > >> > >>>>> however the state store is created through streambuilder
> > but
> > > >> is
> > > >> > >> not
> > > >> > >> > >>>>> actually modified through stream processor topology
> > however
> > > >> it is
> > > >> > >> to be
> > > >> > >> > >>>>> modified from outside the stream topology. So,
> > essentially,
> > > >> the
> > > >> > >> state
> > > >> > >> > >>>> store
> > > >> > >> > >>>>> is just to be created from streambuilder and then to be
> > used
> > > >> as an
> > > >> > >> > >>>>> application level cache that will get replicated between
> > > >> > >> application
> > > >> > >> > >>>>> instances. Is this possible using state stores?
> > > >> > >> > >>>>>
> > > >> > >> > >>>>> Secondly, if possible, is this a good design approach?
> > > >> > >> > >>>>>
> > > >> > >> > >>>>> Appreciate your response since I don't know the
> > internals of
> > > >> state
> > > >> > >> > >>>> stores.
> > > >> > >> > >>>>>
> > > >> > >> > >>>>
> > > >> > >> > >>>>
> > > >> > >> > >>>
> > > >> > >> > >>
> > > >> > >> > >>
> > > >> > >> > >
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > Attachments:
> > > >> > >> > * signature.asc
> > > >> > >>
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
John,

I think I can get the cache structure modified to make use of GlobalKTable
here so the data can be shared across. I could get information that the
admin data will be uploaded well in advance before main events so the issue
with 'missed joins' won't exists since by the time main events start
flowing, the admin data has been synchronized to all the service instances.

By the way, I would like to ask you how GlobalKTable works internally, does
it store all data in memory or it gets it from the backed topic everytime?
Secondly, what kind of internal data structure does it use? Is it good for
constant time performance?

On Thu, May 7, 2020 at 7:27 PM John Roesler <vv...@apache.org> wrote:

> Hi Pushkar,
>
> To answer your question about tuning the global store latency, I think the
> biggest impact thing you can do is to configure the consumer that loads the
> data for global stores. You can pass configs specifically to the global
> consumer with the prefix: “ global.consumer.”
>
> Regarding the larger situation, it seems like the global table and a
> distributed cache would display the same basic behavior in terms of the
> potential for missed joins. Then, it probably makes sense to go for the
> option with fewer components to implement and maintain, which to me points
> to the global KTable.
>
> Since you can anticipate that missed joins can be a problem, you can build
> in some metrics and reporting for how many misses you actually observe, and
> potentially redesign the app if it’s actually a problem.
>
> I hope this helps!
> -John
>
> On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> > Thanks John... appreciate your inputs and suggestions. I have been
> assigned
> > recently to this task (of persisting the cache) and haven't been involved
> > in original design and architecture and agree with all the issues you
> have
> > highlighted.
> > However, at this point, i don't think the application can be converted to
> > streams since the design is not flexible and it would require lot of
> > rewrite of code plus subsequent testing.
> >
> > My first thought was to use external database only,  preferably the
> > distributed caching systems like Apache Ignite since it will have least
> > impact on performance. Going to database for every event would impact the
> > throughput a lot. Probably having distributed caching (key/value pairs)
> > would have comparatively lesser impact.
> > Second choice is to go for GlobalKTable however this needs to be done
> very
> > carefully.
> >
> > Thanks again!
> >
> > On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pd...@gmail.com>
> wrote:
> >
> > > Thanks John... what parameters would affect the latency in case
> > > GlobalKTable will be used and is there any configurations that could be
> > > tuned to minimize the latency of sync with input topic?
> > >
> > > On Mon, May 4, 2020 at 10:20 PM John Roesler <vv...@apache.org>
> wrote:
> > >
> > >> Hello Pushkar,
> > >>
> > >> Yes, that’s correct. The operation you describe is currently not
> > >> supported. If you want to keep the structure you described in place,
> I’d
> > >> suggest using an external database for the admin objects. I’ll give
> another
> > >> idea below.
> > >>
> > >> With your current architecture, I’m a little concerned about data
> races.
> > >> From what I saw, nothing would prevent processing stream records with
> agent
> > >> 10 before you process the admin record with agent 10. This problem
> will
> > >> persist no matter where you locate the cache.
> > >>
> > >> GlobalKTable would no doubt make it worse, since it increases the
> latency
> > >> before admin record 10 is queriable everywhere.
> > >>
> > >> I think you’ll want to make a call between architecture simplicity
> > >> (remote cache or global KTable) vs the probability of missed joins.
> > >>
> > >> I think the “best” way to solve this problem (that comes to mind
> anyway)
> > >> might be to
> > >> 1. Repartition the stream to be co-partitioned with the admin records.
> > >> 2. Do a local (not global) stream-table join
> > >> 3. Enable task idling
> > >>
> > >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make
> the
> > >> agent Id the new key of the stream, and then use ‘through’, (where the
> > >> intermediate topic has the same number of partitions as the admin
> topic) to
> > >> do the repartitioning. In 2.6, there is a “repartition” operator that
> will
> > >> make this easier.
> > >>
> > >> The repartition ensures that all stream records with agent id 10 will
> be
> > >> processed by the same thread that processes the admin records with
> agent id
> > >> 10, hence it will be able to find agent 10 in the local KTable store.
> > >>
> > >> Task idling will minimize your chances of missing any enrichments.
> When a
> > >> task has two inputs (E.g., your repartitioned stream joining with the
> admin
> > >> table), it makes Streams wait until both inputs are buffered before
> > >> processing, so it can do a better job of processing in timestamp
> order.
> > >>
> > >> I hope this helps!
> > >> -John
> > >>
> > >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> > >> > If i understand correctly, Kafka is not designed to provide
> replicated
> > >> > caching mechanism wherein the updates to cache will be synchronous
> > >> across
> > >> > multiple cache instances.
> > >> >
> > >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pdeole2015@gmail.com
> >
> > >> wrote:
> > >> >
> > >> > > Thanks John.
> > >> > >
> > >> > > Actually, this is a normal consumer-producer application wherein
> > >> there are
> > >> > > 2 consumers (admin consumer and main consumer) consuming messages
> > >> from 2
> > >> > > different topics.
> > >> > > One of the consumers consumes messages from a admin topic and
> > >> populates
> > >> > > data in a cache e.g. lets say agent with agent id 10 for which the
> > >> first
> > >> > > name and last name is received is populated in cache. When the
> other
> > >> > > consumer consumes message and it has agent id 10 then it reads the
> > >> cache,
> > >> > > appends the first name and last name and then sends enriched
> event to
> > >> > > producer.
> > >> > > In this case, each application instance consumes all the events
> from
> > >> admin
> > >> > > topic (unique consumer id) and keeps them in the cache in memory.
> > >> > > Now the requirement is to persist the cache and make is shared
> > >> between the
> > >> > > application instances, so each instance would consume partitions
> of
> > >> admin
> > >> > > topic and write to admin cache.
> > >> > >
> > >> > > If we want to use kafka streams, the application is so much
> evolved
> > >> that
> > >> > > it is difficult to migrate to streams at this stage. Secondly,
> from
> > >> past
> > >> > > mail chains, streams also won't serve the requirement since local
> > >> state
> > >> > > stores would just hold the local state of admin data and the cache
> > >> written
> > >> > > by each instance won't be shared with other instances.
> > >> > >
> > >> > > Global state stores may help but again it requires writing to the
> > >> topic
> > >> > > which is then synced with the state stores in the instances and
> the
> > >> > > instances may not be in sync with each.
> > >> > > I am not sure if this would cause any inconsistencies since i
> don't
> > >> know
> > >> > > how the events would flow from source e.g. if admin data is
> consumed
> > >> by one
> > >> > > instance which then modified the topic but it is not yet synced to
> > >> all the
> > >> > > global state stores and the next event arrived on the main
> consumer
> > >> on a
> > >> > > different instance and it tried to read from store cache then it
> > >> doesn't
> > >> > > get the data, so the event passed on without enriched data.
> > >> > > That's pretty much about the use case.
> > >> > >
> > >> > >
> > >> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org>
> > >> wrote:
> > >> > >
> > >> > >> Hi Pushkar,
> > >> > >>
> > >> > >> I’ve been wondering if we should add writable tables to the
> Streams
> > >> api.
> > >> > >> Can you explain more about your use case and how it would
> integrate
> > >> with
> > >> > >> your application?
> > >> > >>
> > >> > >> Incidentally, this would also help us provide more concrete
> advice.
> > >> > >>
> > >> > >> Thanks!
> > >> > >> John
> > >> > >>
> > >> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> > >> > >> > Both stores sever a different purpose.
> > >> > >> >
> > >> > >> > Regular stores allow you to store state the application
> computes.
> > >> > >> > Writing into the changelog is a fault-tolerance mechanism.
> > >> > >> >
> > >> > >> > Global store hold "axially" data that is provided from
> "outside"
> > >> of the
> > >> > >> > app. There is no changelog topic, but only the input topic
> (that
> > >> is used
> > >> > >> > to re-create the global state).
> > >> > >> >
> > >> > >> > Local stores are sharded and updates are "sync" as they don't
> need
> > >> to be
> > >> > >> > shared with anybody else.
> > >> > >> >
> > >> > >> > For global stores, as all instances need to be updated,
> updates are
> > >> > >> > async (we don't know when which instance will update it's own
> > >> global
> > >> > >> > store replica).
> > >> > >> >
> > >> > >> > >> Say one stream thread updates the topic for global store and
> > >> starts
> > >> > >> > >> processing next event wherein the processor tries to read
> the
> > >> global
> > >> > >> store
> > >> > >> > >> which may not have been synced with the topic?
> > >> > >> >
> > >> > >> > Correct. There is no guarantee when the update to the global
> store
> > >> will
> > >> > >> > be applied. As said, global stores are not designed to hold
> data
> > >> the
> > >> > >> > application computes.
> > >> > >> >
> > >> > >> >
> > >> > >> > -Matthias
> > >> > >> >
> > >> > >> >
> > >> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > >> > >> > > thanks... will try with GlobalKTable.
> > >> > >> > > As a side question, I didn't really understand the
> significance
> > >> of
> > >> > >> global
> > >> > >> > > state store which kind of works in a reverse way to local
> state
> > >> store
> > >> > >> i.e.
> > >> > >> > > local state store is updated and then saved to changelog
> topic
> > >> > >> whereas in
> > >> > >> > > case of global state store the topic is updated first and
> then
> > >> synced
> > >> > >> to
> > >> > >> > > global state store. Do these two work in sync i.e. the
> update to
> > >> > >> topic and
> > >> > >> > > global state store ?
> > >> > >> > >
> > >> > >> > > Say one stream thread updates the topic for global store and
> > >> starts
> > >> > >> > > processing next event wherein the processor tries to read the
> > >> global
> > >> > >> store
> > >> > >> > > which may not have been synced with the topic?
> > >> > >> > >
> > >> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <
> mjsax@apache.org
> > >> >
> > >> > >> wrote:
> > >> > >> > >
> > >> > >> > >> Yes.
> > >> > >> > >>
> > >> > >> > >> A `GlobalKTable` uses a global store internally.
> > >> > >> > >>
> > >> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> > >> > >> > >> `Topology.addGlobalStore()` to add a global store
> "manually".
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >> -Matthias
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > >> > >> > >>> Thanks Matthias.
> > >> > >> > >>> Can you elaborate on the replicated caching layer part?
> > >> > >> > >>> When you say global stores, do you mean GlobalKTable
> created
> > >> from a
> > >> > >> topic
> > >> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method
> ?
> > >> > >> > >>>
> > >> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> > >> mjsax@apache.org>
> > >> > >> > >> wrote:
> > >> > >> > >>>
> > >> > >> > >>>> It's not possible to modify state store from "outside".
> > >> > >> > >>>>
> > >> > >> > >>>> If you want to build a "replicated caching layer", you
> could
> > >> use
> > >> > >> global
> > >> > >> > >>>> stores and write into the corresponding topics to update
> all
> > >> > >> stores. Of
> > >> > >> > >>>> course, those updates would be async.
> > >> > >> > >>>>
> > >> > >> > >>>>
> > >> > >> > >>>> -Matthias
> > >> > >> > >>>>
> > >> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > >> > >> > >>>>> Hi All,
> > >> > >> > >>>>>
> > >> > >> > >>>>> I am wondering if this is possible: i have been asked to
> use
> > >> state
> > >> > >> > >> stores
> > >> > >> > >>>>> as a general replicated cache among multiple instances of
> > >> service
> > >> > >> > >>>> instances
> > >> > >> > >>>>> however the state store is created through streambuilder
> but
> > >> is
> > >> > >> not
> > >> > >> > >>>>> actually modified through stream processor topology
> however
> > >> it is
> > >> > >> to be
> > >> > >> > >>>>> modified from outside the stream topology. So,
> essentially,
> > >> the
> > >> > >> state
> > >> > >> > >>>> store
> > >> > >> > >>>>> is just to be created from streambuilder and then to be
> used
> > >> as an
> > >> > >> > >>>>> application level cache that will get replicated between
> > >> > >> application
> > >> > >> > >>>>> instances. Is this possible using state stores?
> > >> > >> > >>>>>
> > >> > >> > >>>>> Secondly, if possible, is this a good design approach?
> > >> > >> > >>>>>
> > >> > >> > >>>>> Appreciate your response since I don't know the
> internals of
> > >> state
> > >> > >> > >>>> stores.
> > >> > >> > >>>>>
> > >> > >> > >>>>
> > >> > >> > >>>>
> > >> > >> > >>>
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >
> > >> > >> >
> > >> > >> >
> > >> > >> > Attachments:
> > >> > >> > * signature.asc
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by John Roesler <vv...@apache.org>.
Hi Pushkar,

To answer your question about tuning the global store latency, I think the biggest impact thing you can do is to configure the consumer that loads the data for global stores. You can pass configs specifically to the global consumer with the prefix: “ global.consumer.”

Regarding the larger situation, it seems like the global table and a distributed cache would display the same basic behavior in terms of the potential for missed joins. Then, it probably makes sense to go for the option with fewer components to implement and maintain, which to me points to the global KTable. 

Since you can anticipate that missed joins can be a problem, you can build in some metrics and reporting for how many misses you actually observe, and potentially redesign the app if it’s actually a problem. 

I hope this helps!
-John 

On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> Thanks John... appreciate your inputs and suggestions. I have been assigned
> recently to this task (of persisting the cache) and haven't been involved
> in original design and architecture and agree with all the issues you have
> highlighted.
> However, at this point, i don't think the application can be converted to
> streams since the design is not flexible and it would require lot of
> rewrite of code plus subsequent testing.
> 
> My first thought was to use external database only,  preferably the
> distributed caching systems like Apache Ignite since it will have least
> impact on performance. Going to database for every event would impact the
> throughput a lot. Probably having distributed caching (key/value pairs)
> would have comparatively lesser impact.
> Second choice is to go for GlobalKTable however this needs to be done very
> carefully.
> 
> Thanks again!
> 
> On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pd...@gmail.com> wrote:
> 
> > Thanks John... what parameters would affect the latency in case
> > GlobalKTable will be used and is there any configurations that could be
> > tuned to minimize the latency of sync with input topic?
> >
> > On Mon, May 4, 2020 at 10:20 PM John Roesler <vv...@apache.org> wrote:
> >
> >> Hello Pushkar,
> >>
> >> Yes, that’s correct. The operation you describe is currently not
> >> supported. If you want to keep the structure you described in place, I’d
> >> suggest using an external database for the admin objects. I’ll give another
> >> idea below.
> >>
> >> With your current architecture, I’m a little concerned about data races.
> >> From what I saw, nothing would prevent processing stream records with agent
> >> 10 before you process the admin record with agent 10. This problem will
> >> persist no matter where you locate the cache.
> >>
> >> GlobalKTable would no doubt make it worse, since it increases the latency
> >> before admin record 10 is queriable everywhere.
> >>
> >> I think you’ll want to make a call between architecture simplicity
> >> (remote cache or global KTable) vs the probability of missed joins.
> >>
> >> I think the “best” way to solve this problem (that comes to mind anyway)
> >> might be to
> >> 1. Repartition the stream to be co-partitioned with the admin records.
> >> 2. Do a local (not global) stream-table join
> >> 3. Enable task idling
> >>
> >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make the
> >> agent Id the new key of the stream, and then use ‘through’, (where the
> >> intermediate topic has the same number of partitions as the admin topic) to
> >> do the repartitioning. In 2.6, there is a “repartition” operator that will
> >> make this easier.
> >>
> >> The repartition ensures that all stream records with agent id 10 will be
> >> processed by the same thread that processes the admin records with agent id
> >> 10, hence it will be able to find agent 10 in the local KTable store.
> >>
> >> Task idling will minimize your chances of missing any enrichments. When a
> >> task has two inputs (E.g., your repartitioned stream joining with the admin
> >> table), it makes Streams wait until both inputs are buffered before
> >> processing, so it can do a better job of processing in timestamp order.
> >>
> >> I hope this helps!
> >> -John
> >>
> >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> >> > If i understand correctly, Kafka is not designed to provide replicated
> >> > caching mechanism wherein the updates to cache will be synchronous
> >> across
> >> > multiple cache instances.
> >> >
> >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pd...@gmail.com>
> >> wrote:
> >> >
> >> > > Thanks John.
> >> > >
> >> > > Actually, this is a normal consumer-producer application wherein
> >> there are
> >> > > 2 consumers (admin consumer and main consumer) consuming messages
> >> from 2
> >> > > different topics.
> >> > > One of the consumers consumes messages from a admin topic and
> >> populates
> >> > > data in a cache e.g. lets say agent with agent id 10 for which the
> >> first
> >> > > name and last name is received is populated in cache. When the other
> >> > > consumer consumes message and it has agent id 10 then it reads the
> >> cache,
> >> > > appends the first name and last name and then sends enriched event to
> >> > > producer.
> >> > > In this case, each application instance consumes all the events from
> >> admin
> >> > > topic (unique consumer id) and keeps them in the cache in memory.
> >> > > Now the requirement is to persist the cache and make is shared
> >> between the
> >> > > application instances, so each instance would consume partitions of
> >> admin
> >> > > topic and write to admin cache.
> >> > >
> >> > > If we want to use kafka streams, the application is so much evolved
> >> that
> >> > > it is difficult to migrate to streams at this stage. Secondly, from
> >> past
> >> > > mail chains, streams also won't serve the requirement since local
> >> state
> >> > > stores would just hold the local state of admin data and the cache
> >> written
> >> > > by each instance won't be shared with other instances.
> >> > >
> >> > > Global state stores may help but again it requires writing to the
> >> topic
> >> > > which is then synced with the state stores in the instances and the
> >> > > instances may not be in sync with each.
> >> > > I am not sure if this would cause any inconsistencies since i don't
> >> know
> >> > > how the events would flow from source e.g. if admin data is consumed
> >> by one
> >> > > instance which then modified the topic but it is not yet synced to
> >> all the
> >> > > global state stores and the next event arrived on the main consumer
> >> on a
> >> > > different instance and it tried to read from store cache then it
> >> doesn't
> >> > > get the data, so the event passed on without enriched data.
> >> > > That's pretty much about the use case.
> >> > >
> >> > >
> >> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org>
> >> wrote:
> >> > >
> >> > >> Hi Pushkar,
> >> > >>
> >> > >> I’ve been wondering if we should add writable tables to the Streams
> >> api.
> >> > >> Can you explain more about your use case and how it would integrate
> >> with
> >> > >> your application?
> >> > >>
> >> > >> Incidentally, this would also help us provide more concrete advice.
> >> > >>
> >> > >> Thanks!
> >> > >> John
> >> > >>
> >> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> >> > >> > Both stores sever a different purpose.
> >> > >> >
> >> > >> > Regular stores allow you to store state the application computes.
> >> > >> > Writing into the changelog is a fault-tolerance mechanism.
> >> > >> >
> >> > >> > Global store hold "axially" data that is provided from "outside"
> >> of the
> >> > >> > app. There is no changelog topic, but only the input topic (that
> >> is used
> >> > >> > to re-create the global state).
> >> > >> >
> >> > >> > Local stores are sharded and updates are "sync" as they don't need
> >> to be
> >> > >> > shared with anybody else.
> >> > >> >
> >> > >> > For global stores, as all instances need to be updated, updates are
> >> > >> > async (we don't know when which instance will update it's own
> >> global
> >> > >> > store replica).
> >> > >> >
> >> > >> > >> Say one stream thread updates the topic for global store and
> >> starts
> >> > >> > >> processing next event wherein the processor tries to read the
> >> global
> >> > >> store
> >> > >> > >> which may not have been synced with the topic?
> >> > >> >
> >> > >> > Correct. There is no guarantee when the update to the global store
> >> will
> >> > >> > be applied. As said, global stores are not designed to hold data
> >> the
> >> > >> > application computes.
> >> > >> >
> >> > >> >
> >> > >> > -Matthias
> >> > >> >
> >> > >> >
> >> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> >> > >> > > thanks... will try with GlobalKTable.
> >> > >> > > As a side question, I didn't really understand the significance
> >> of
> >> > >> global
> >> > >> > > state store which kind of works in a reverse way to local state
> >> store
> >> > >> i.e.
> >> > >> > > local state store is updated and then saved to changelog topic
> >> > >> whereas in
> >> > >> > > case of global state store the topic is updated first and then
> >> synced
> >> > >> to
> >> > >> > > global state store. Do these two work in sync i.e. the update to
> >> > >> topic and
> >> > >> > > global state store ?
> >> > >> > >
> >> > >> > > Say one stream thread updates the topic for global store and
> >> starts
> >> > >> > > processing next event wherein the processor tries to read the
> >> global
> >> > >> store
> >> > >> > > which may not have been synced with the topic?
> >> > >> > >
> >> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mjsax@apache.org
> >> >
> >> > >> wrote:
> >> > >> > >
> >> > >> > >> Yes.
> >> > >> > >>
> >> > >> > >> A `GlobalKTable` uses a global store internally.
> >> > >> > >>
> >> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> >> > >> > >> `Topology.addGlobalStore()` to add a global store "manually".
> >> > >> > >>
> >> > >> > >>
> >> > >> > >> -Matthias
> >> > >> > >>
> >> > >> > >>
> >> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >> > >> > >>> Thanks Matthias.
> >> > >> > >>> Can you elaborate on the replicated caching layer part?
> >> > >> > >>> When you say global stores, do you mean GlobalKTable created
> >> from a
> >> > >> topic
> >> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >> > >> > >>>
> >> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> >> mjsax@apache.org>
> >> > >> > >> wrote:
> >> > >> > >>>
> >> > >> > >>>> It's not possible to modify state store from "outside".
> >> > >> > >>>>
> >> > >> > >>>> If you want to build a "replicated caching layer", you could
> >> use
> >> > >> global
> >> > >> > >>>> stores and write into the corresponding topics to update all
> >> > >> stores. Of
> >> > >> > >>>> course, those updates would be async.
> >> > >> > >>>>
> >> > >> > >>>>
> >> > >> > >>>> -Matthias
> >> > >> > >>>>
> >> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >> > >> > >>>>> Hi All,
> >> > >> > >>>>>
> >> > >> > >>>>> I am wondering if this is possible: i have been asked to use
> >> state
> >> > >> > >> stores
> >> > >> > >>>>> as a general replicated cache among multiple instances of
> >> service
> >> > >> > >>>> instances
> >> > >> > >>>>> however the state store is created through streambuilder but
> >> is
> >> > >> not
> >> > >> > >>>>> actually modified through stream processor topology however
> >> it is
> >> > >> to be
> >> > >> > >>>>> modified from outside the stream topology. So, essentially,
> >> the
> >> > >> state
> >> > >> > >>>> store
> >> > >> > >>>>> is just to be created from streambuilder and then to be used
> >> as an
> >> > >> > >>>>> application level cache that will get replicated between
> >> > >> application
> >> > >> > >>>>> instances. Is this possible using state stores?
> >> > >> > >>>>>
> >> > >> > >>>>> Secondly, if possible, is this a good design approach?
> >> > >> > >>>>>
> >> > >> > >>>>> Appreciate your response since I don't know the internals of
> >> state
> >> > >> > >>>> stores.
> >> > >> > >>>>>
> >> > >> > >>>>
> >> > >> > >>>>
> >> > >> > >>>
> >> > >> > >>
> >> > >> > >>
> >> > >> > >
> >> > >> >
> >> > >> >
> >> > >> > Attachments:
> >> > >> > * signature.asc
> >> > >>
> >> > >
> >> >
> >>
> >
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks John... appreciate your inputs and suggestions. I have been assigned
recently to this task (of persisting the cache) and haven't been involved
in original design and architecture and agree with all the issues you have
highlighted.
However, at this point, i don't think the application can be converted to
streams since the design is not flexible and it would require lot of
rewrite of code plus subsequent testing.

My first thought was to use external database only,  preferably the
distributed caching systems like Apache Ignite since it will have least
impact on performance. Going to database for every event would impact the
throughput a lot. Probably having distributed caching (key/value pairs)
would have comparatively lesser impact.
Second choice is to go for GlobalKTable however this needs to be done very
carefully.

Thanks again!

On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pd...@gmail.com> wrote:

> Thanks John... what parameters would affect the latency in case
> GlobalKTable will be used and is there any configurations that could be
> tuned to minimize the latency of sync with input topic?
>
> On Mon, May 4, 2020 at 10:20 PM John Roesler <vv...@apache.org> wrote:
>
>> Hello Pushkar,
>>
>> Yes, that’s correct. The operation you describe is currently not
>> supported. If you want to keep the structure you described in place, I’d
>> suggest using an external database for the admin objects. I’ll give another
>> idea below.
>>
>> With your current architecture, I’m a little concerned about data races.
>> From what I saw, nothing would prevent processing stream records with agent
>> 10 before you process the admin record with agent 10. This problem will
>> persist no matter where you locate the cache.
>>
>> GlobalKTable would no doubt make it worse, since it increases the latency
>> before admin record 10 is queriable everywhere.
>>
>> I think you’ll want to make a call between architecture simplicity
>> (remote cache or global KTable) vs the probability of missed joins.
>>
>> I think the “best” way to solve this problem (that comes to mind anyway)
>> might be to
>> 1. Repartition the stream to be co-partitioned with the admin records.
>> 2. Do a local (not global) stream-table join
>> 3. Enable task idling
>>
>> You can do the repartition today with a ‘map’ or ‘selectKey’ to make the
>> agent Id the new key of the stream, and then use ‘through’, (where the
>> intermediate topic has the same number of partitions as the admin topic) to
>> do the repartitioning. In 2.6, there is a “repartition” operator that will
>> make this easier.
>>
>> The repartition ensures that all stream records with agent id 10 will be
>> processed by the same thread that processes the admin records with agent id
>> 10, hence it will be able to find agent 10 in the local KTable store.
>>
>> Task idling will minimize your chances of missing any enrichments. When a
>> task has two inputs (E.g., your repartitioned stream joining with the admin
>> table), it makes Streams wait until both inputs are buffered before
>> processing, so it can do a better job of processing in timestamp order.
>>
>> I hope this helps!
>> -John
>>
>> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
>> > If i understand correctly, Kafka is not designed to provide replicated
>> > caching mechanism wherein the updates to cache will be synchronous
>> across
>> > multiple cache instances.
>> >
>> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pd...@gmail.com>
>> wrote:
>> >
>> > > Thanks John.
>> > >
>> > > Actually, this is a normal consumer-producer application wherein
>> there are
>> > > 2 consumers (admin consumer and main consumer) consuming messages
>> from 2
>> > > different topics.
>> > > One of the consumers consumes messages from a admin topic and
>> populates
>> > > data in a cache e.g. lets say agent with agent id 10 for which the
>> first
>> > > name and last name is received is populated in cache. When the other
>> > > consumer consumes message and it has agent id 10 then it reads the
>> cache,
>> > > appends the first name and last name and then sends enriched event to
>> > > producer.
>> > > In this case, each application instance consumes all the events from
>> admin
>> > > topic (unique consumer id) and keeps them in the cache in memory.
>> > > Now the requirement is to persist the cache and make is shared
>> between the
>> > > application instances, so each instance would consume partitions of
>> admin
>> > > topic and write to admin cache.
>> > >
>> > > If we want to use kafka streams, the application is so much evolved
>> that
>> > > it is difficult to migrate to streams at this stage. Secondly, from
>> past
>> > > mail chains, streams also won't serve the requirement since local
>> state
>> > > stores would just hold the local state of admin data and the cache
>> written
>> > > by each instance won't be shared with other instances.
>> > >
>> > > Global state stores may help but again it requires writing to the
>> topic
>> > > which is then synced with the state stores in the instances and the
>> > > instances may not be in sync with each.
>> > > I am not sure if this would cause any inconsistencies since i don't
>> know
>> > > how the events would flow from source e.g. if admin data is consumed
>> by one
>> > > instance which then modified the topic but it is not yet synced to
>> all the
>> > > global state stores and the next event arrived on the main consumer
>> on a
>> > > different instance and it tried to read from store cache then it
>> doesn't
>> > > get the data, so the event passed on without enriched data.
>> > > That's pretty much about the use case.
>> > >
>> > >
>> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org>
>> wrote:
>> > >
>> > >> Hi Pushkar,
>> > >>
>> > >> I’ve been wondering if we should add writable tables to the Streams
>> api.
>> > >> Can you explain more about your use case and how it would integrate
>> with
>> > >> your application?
>> > >>
>> > >> Incidentally, this would also help us provide more concrete advice.
>> > >>
>> > >> Thanks!
>> > >> John
>> > >>
>> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
>> > >> > Both stores sever a different purpose.
>> > >> >
>> > >> > Regular stores allow you to store state the application computes.
>> > >> > Writing into the changelog is a fault-tolerance mechanism.
>> > >> >
>> > >> > Global store hold "axially" data that is provided from "outside"
>> of the
>> > >> > app. There is no changelog topic, but only the input topic (that
>> is used
>> > >> > to re-create the global state).
>> > >> >
>> > >> > Local stores are sharded and updates are "sync" as they don't need
>> to be
>> > >> > shared with anybody else.
>> > >> >
>> > >> > For global stores, as all instances need to be updated, updates are
>> > >> > async (we don't know when which instance will update it's own
>> global
>> > >> > store replica).
>> > >> >
>> > >> > >> Say one stream thread updates the topic for global store and
>> starts
>> > >> > >> processing next event wherein the processor tries to read the
>> global
>> > >> store
>> > >> > >> which may not have been synced with the topic?
>> > >> >
>> > >> > Correct. There is no guarantee when the update to the global store
>> will
>> > >> > be applied. As said, global stores are not designed to hold data
>> the
>> > >> > application computes.
>> > >> >
>> > >> >
>> > >> > -Matthias
>> > >> >
>> > >> >
>> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
>> > >> > > thanks... will try with GlobalKTable.
>> > >> > > As a side question, I didn't really understand the significance
>> of
>> > >> global
>> > >> > > state store which kind of works in a reverse way to local state
>> store
>> > >> i.e.
>> > >> > > local state store is updated and then saved to changelog topic
>> > >> whereas in
>> > >> > > case of global state store the topic is updated first and then
>> synced
>> > >> to
>> > >> > > global state store. Do these two work in sync i.e. the update to
>> > >> topic and
>> > >> > > global state store ?
>> > >> > >
>> > >> > > Say one stream thread updates the topic for global store and
>> starts
>> > >> > > processing next event wherein the processor tries to read the
>> global
>> > >> store
>> > >> > > which may not have been synced with the topic?
>> > >> > >
>> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mjsax@apache.org
>> >
>> > >> wrote:
>> > >> > >
>> > >> > >> Yes.
>> > >> > >>
>> > >> > >> A `GlobalKTable` uses a global store internally.
>> > >> > >>
>> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
>> > >> > >> `Topology.addGlobalStore()` to add a global store "manually".
>> > >> > >>
>> > >> > >>
>> > >> > >> -Matthias
>> > >> > >>
>> > >> > >>
>> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>> > >> > >>> Thanks Matthias.
>> > >> > >>> Can you elaborate on the replicated caching layer part?
>> > >> > >>> When you say global stores, do you mean GlobalKTable created
>> from a
>> > >> topic
>> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>> > >> > >>>
>> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
>> mjsax@apache.org>
>> > >> > >> wrote:
>> > >> > >>>
>> > >> > >>>> It's not possible to modify state store from "outside".
>> > >> > >>>>
>> > >> > >>>> If you want to build a "replicated caching layer", you could
>> use
>> > >> global
>> > >> > >>>> stores and write into the corresponding topics to update all
>> > >> stores. Of
>> > >> > >>>> course, those updates would be async.
>> > >> > >>>>
>> > >> > >>>>
>> > >> > >>>> -Matthias
>> > >> > >>>>
>> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>> > >> > >>>>> Hi All,
>> > >> > >>>>>
>> > >> > >>>>> I am wondering if this is possible: i have been asked to use
>> state
>> > >> > >> stores
>> > >> > >>>>> as a general replicated cache among multiple instances of
>> service
>> > >> > >>>> instances
>> > >> > >>>>> however the state store is created through streambuilder but
>> is
>> > >> not
>> > >> > >>>>> actually modified through stream processor topology however
>> it is
>> > >> to be
>> > >> > >>>>> modified from outside the stream topology. So, essentially,
>> the
>> > >> state
>> > >> > >>>> store
>> > >> > >>>>> is just to be created from streambuilder and then to be used
>> as an
>> > >> > >>>>> application level cache that will get replicated between
>> > >> application
>> > >> > >>>>> instances. Is this possible using state stores?
>> > >> > >>>>>
>> > >> > >>>>> Secondly, if possible, is this a good design approach?
>> > >> > >>>>>
>> > >> > >>>>> Appreciate your response since I don't know the internals of
>> state
>> > >> > >>>> stores.
>> > >> > >>>>>
>> > >> > >>>>
>> > >> > >>>>
>> > >> > >>>
>> > >> > >>
>> > >> > >>
>> > >> > >
>> > >> >
>> > >> >
>> > >> > Attachments:
>> > >> > * signature.asc
>> > >>
>> > >
>> >
>>
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks John... what parameters would affect the latency in case
GlobalKTable will be used and is there any configurations that could be
tuned to minimize the latency of sync with input topic?

On Mon, May 4, 2020 at 10:20 PM John Roesler <vv...@apache.org> wrote:

> Hello Pushkar,
>
> Yes, that’s correct. The operation you describe is currently not
> supported. If you want to keep the structure you described in place, I’d
> suggest using an external database for the admin objects. I’ll give another
> idea below.
>
> With your current architecture, I’m a little concerned about data races.
> From what I saw, nothing would prevent processing stream records with agent
> 10 before you process the admin record with agent 10. This problem will
> persist no matter where you locate the cache.
>
> GlobalKTable would no doubt make it worse, since it increases the latency
> before admin record 10 is queriable everywhere.
>
> I think you’ll want to make a call between architecture simplicity (remote
> cache or global KTable) vs the probability of missed joins.
>
> I think the “best” way to solve this problem (that comes to mind anyway)
> might be to
> 1. Repartition the stream to be co-partitioned with the admin records.
> 2. Do a local (not global) stream-table join
> 3. Enable task idling
>
> You can do the repartition today with a ‘map’ or ‘selectKey’ to make the
> agent Id the new key of the stream, and then use ‘through’, (where the
> intermediate topic has the same number of partitions as the admin topic) to
> do the repartitioning. In 2.6, there is a “repartition” operator that will
> make this easier.
>
> The repartition ensures that all stream records with agent id 10 will be
> processed by the same thread that processes the admin records with agent id
> 10, hence it will be able to find agent 10 in the local KTable store.
>
> Task idling will minimize your chances of missing any enrichments. When a
> task has two inputs (E.g., your repartitioned stream joining with the admin
> table), it makes Streams wait until both inputs are buffered before
> processing, so it can do a better job of processing in timestamp order.
>
> I hope this helps!
> -John
>
> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> > If i understand correctly, Kafka is not designed to provide replicated
> > caching mechanism wherein the updates to cache will be synchronous across
> > multiple cache instances.
> >
> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pd...@gmail.com>
> wrote:
> >
> > > Thanks John.
> > >
> > > Actually, this is a normal consumer-producer application wherein there
> are
> > > 2 consumers (admin consumer and main consumer) consuming messages from
> 2
> > > different topics.
> > > One of the consumers consumes messages from a admin topic and populates
> > > data in a cache e.g. lets say agent with agent id 10 for which the
> first
> > > name and last name is received is populated in cache. When the other
> > > consumer consumes message and it has agent id 10 then it reads the
> cache,
> > > appends the first name and last name and then sends enriched event to
> > > producer.
> > > In this case, each application instance consumes all the events from
> admin
> > > topic (unique consumer id) and keeps them in the cache in memory.
> > > Now the requirement is to persist the cache and make is shared between
> the
> > > application instances, so each instance would consume partitions of
> admin
> > > topic and write to admin cache.
> > >
> > > If we want to use kafka streams, the application is so much evolved
> that
> > > it is difficult to migrate to streams at this stage. Secondly, from
> past
> > > mail chains, streams also won't serve the requirement since local state
> > > stores would just hold the local state of admin data and the cache
> written
> > > by each instance won't be shared with other instances.
> > >
> > > Global state stores may help but again it requires writing to the topic
> > > which is then synced with the state stores in the instances and the
> > > instances may not be in sync with each.
> > > I am not sure if this would cause any inconsistencies since i don't
> know
> > > how the events would flow from source e.g. if admin data is consumed
> by one
> > > instance which then modified the topic but it is not yet synced to all
> the
> > > global state stores and the next event arrived on the main consumer on
> a
> > > different instance and it tried to read from store cache then it
> doesn't
> > > get the data, so the event passed on without enriched data.
> > > That's pretty much about the use case.
> > >
> > >
> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org>
> wrote:
> > >
> > >> Hi Pushkar,
> > >>
> > >> I’ve been wondering if we should add writable tables to the Streams
> api.
> > >> Can you explain more about your use case and how it would integrate
> with
> > >> your application?
> > >>
> > >> Incidentally, this would also help us provide more concrete advice.
> > >>
> > >> Thanks!
> > >> John
> > >>
> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> > >> > Both stores sever a different purpose.
> > >> >
> > >> > Regular stores allow you to store state the application computes.
> > >> > Writing into the changelog is a fault-tolerance mechanism.
> > >> >
> > >> > Global store hold "axially" data that is provided from "outside" of
> the
> > >> > app. There is no changelog topic, but only the input topic (that is
> used
> > >> > to re-create the global state).
> > >> >
> > >> > Local stores are sharded and updates are "sync" as they don't need
> to be
> > >> > shared with anybody else.
> > >> >
> > >> > For global stores, as all instances need to be updated, updates are
> > >> > async (we don't know when which instance will update it's own global
> > >> > store replica).
> > >> >
> > >> > >> Say one stream thread updates the topic for global store and
> starts
> > >> > >> processing next event wherein the processor tries to read the
> global
> > >> store
> > >> > >> which may not have been synced with the topic?
> > >> >
> > >> > Correct. There is no guarantee when the update to the global store
> will
> > >> > be applied. As said, global stores are not designed to hold data the
> > >> > application computes.
> > >> >
> > >> >
> > >> > -Matthias
> > >> >
> > >> >
> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > >> > > thanks... will try with GlobalKTable.
> > >> > > As a side question, I didn't really understand the significance of
> > >> global
> > >> > > state store which kind of works in a reverse way to local state
> store
> > >> i.e.
> > >> > > local state store is updated and then saved to changelog topic
> > >> whereas in
> > >> > > case of global state store the topic is updated first and then
> synced
> > >> to
> > >> > > global state store. Do these two work in sync i.e. the update to
> > >> topic and
> > >> > > global state store ?
> > >> > >
> > >> > > Say one stream thread updates the topic for global store and
> starts
> > >> > > processing next event wherein the processor tries to read the
> global
> > >> store
> > >> > > which may not have been synced with the topic?
> > >> > >
> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
> > >> wrote:
> > >> > >
> > >> > >> Yes.
> > >> > >>
> > >> > >> A `GlobalKTable` uses a global store internally.
> > >> > >>
> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> > >> > >> `Topology.addGlobalStore()` to add a global store "manually".
> > >> > >>
> > >> > >>
> > >> > >> -Matthias
> > >> > >>
> > >> > >>
> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > >> > >>> Thanks Matthias.
> > >> > >>> Can you elaborate on the replicated caching layer part?
> > >> > >>> When you say global stores, do you mean GlobalKTable created
> from a
> > >> topic
> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> > >> > >>>
> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> mjsax@apache.org>
> > >> > >> wrote:
> > >> > >>>
> > >> > >>>> It's not possible to modify state store from "outside".
> > >> > >>>>
> > >> > >>>> If you want to build a "replicated caching layer", you could
> use
> > >> global
> > >> > >>>> stores and write into the corresponding topics to update all
> > >> stores. Of
> > >> > >>>> course, those updates would be async.
> > >> > >>>>
> > >> > >>>>
> > >> > >>>> -Matthias
> > >> > >>>>
> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > >> > >>>>> Hi All,
> > >> > >>>>>
> > >> > >>>>> I am wondering if this is possible: i have been asked to use
> state
> > >> > >> stores
> > >> > >>>>> as a general replicated cache among multiple instances of
> service
> > >> > >>>> instances
> > >> > >>>>> however the state store is created through streambuilder but
> is
> > >> not
> > >> > >>>>> actually modified through stream processor topology however
> it is
> > >> to be
> > >> > >>>>> modified from outside the stream topology. So, essentially,
> the
> > >> state
> > >> > >>>> store
> > >> > >>>>> is just to be created from streambuilder and then to be used
> as an
> > >> > >>>>> application level cache that will get replicated between
> > >> application
> > >> > >>>>> instances. Is this possible using state stores?
> > >> > >>>>>
> > >> > >>>>> Secondly, if possible, is this a good design approach?
> > >> > >>>>>
> > >> > >>>>> Appreciate your response since I don't know the internals of
> state
> > >> > >>>> stores.
> > >> > >>>>>
> > >> > >>>>
> > >> > >>>>
> > >> > >>>
> > >> > >>
> > >> > >>
> > >> > >
> > >> >
> > >> >
> > >> > Attachments:
> > >> > * signature.asc
> > >>
> > >
> >
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by John Roesler <vv...@apache.org>.
Hello Pushkar,

Yes, that’s correct. The operation you describe is currently not supported. If you want to keep the structure you described in place, I’d suggest using an external database for the admin objects. I’ll give another idea below.  

With your current architecture, I’m a little concerned about data races. From what I saw, nothing would prevent processing stream records with agent 10 before you process the admin record with agent 10. This problem will persist no matter where you locate the cache.

GlobalKTable would no doubt make it worse, since it increases the latency before admin record 10 is queriable everywhere.

I think you’ll want to make a call between architecture simplicity (remote cache or global KTable) vs the probability of missed joins. 

I think the “best” way to solve this problem (that comes to mind anyway) might be to
1. Repartition the stream to be co-partitioned with the admin records.
2. Do a local (not global) stream-table join
3. Enable task idling

You can do the repartition today with a ‘map’ or ‘selectKey’ to make the agent Id the new key of the stream, and then use ‘through’, (where the intermediate topic has the same number of partitions as the admin topic) to do the repartitioning. In 2.6, there is a “repartition” operator that will make this easier. 

The repartition ensures that all stream records with agent id 10 will be processed by the same thread that processes the admin records with agent id 10, hence it will be able to find agent 10 in the local KTable store. 

Task idling will minimize your chances of missing any enrichments. When a task has two inputs (E.g., your repartitioned stream joining with the admin table), it makes Streams wait until both inputs are buffered before processing, so it can do a better job of processing in timestamp order. 

I hope this helps!
-John 

On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> If i understand correctly, Kafka is not designed to provide replicated
> caching mechanism wherein the updates to cache will be synchronous across
> multiple cache instances.
> 
> On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pd...@gmail.com> wrote:
> 
> > Thanks John.
> >
> > Actually, this is a normal consumer-producer application wherein there are
> > 2 consumers (admin consumer and main consumer) consuming messages from 2
> > different topics.
> > One of the consumers consumes messages from a admin topic and populates
> > data in a cache e.g. lets say agent with agent id 10 for which the first
> > name and last name is received is populated in cache. When the other
> > consumer consumes message and it has agent id 10 then it reads the cache,
> > appends the first name and last name and then sends enriched event to
> > producer.
> > In this case, each application instance consumes all the events from admin
> > topic (unique consumer id) and keeps them in the cache in memory.
> > Now the requirement is to persist the cache and make is shared between the
> > application instances, so each instance would consume partitions of admin
> > topic and write to admin cache.
> >
> > If we want to use kafka streams, the application is so much evolved that
> > it is difficult to migrate to streams at this stage. Secondly, from past
> > mail chains, streams also won't serve the requirement since local state
> > stores would just hold the local state of admin data and the cache written
> > by each instance won't be shared with other instances.
> >
> > Global state stores may help but again it requires writing to the topic
> > which is then synced with the state stores in the instances and the
> > instances may not be in sync with each.
> > I am not sure if this would cause any inconsistencies since i don't know
> > how the events would flow from source e.g. if admin data is consumed by one
> > instance which then modified the topic but it is not yet synced to all the
> > global state stores and the next event arrived on the main consumer on a
> > different instance and it tried to read from store cache then it doesn't
> > get the data, so the event passed on without enriched data.
> > That's pretty much about the use case.
> >
> >
> > On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org> wrote:
> >
> >> Hi Pushkar,
> >>
> >> I’ve been wondering if we should add writable tables to the Streams api.
> >> Can you explain more about your use case and how it would integrate with
> >> your application?
> >>
> >> Incidentally, this would also help us provide more concrete advice.
> >>
> >> Thanks!
> >> John
> >>
> >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> >> > Both stores sever a different purpose.
> >> >
> >> > Regular stores allow you to store state the application computes.
> >> > Writing into the changelog is a fault-tolerance mechanism.
> >> >
> >> > Global store hold "axially" data that is provided from "outside" of the
> >> > app. There is no changelog topic, but only the input topic (that is used
> >> > to re-create the global state).
> >> >
> >> > Local stores are sharded and updates are "sync" as they don't need to be
> >> > shared with anybody else.
> >> >
> >> > For global stores, as all instances need to be updated, updates are
> >> > async (we don't know when which instance will update it's own global
> >> > store replica).
> >> >
> >> > >> Say one stream thread updates the topic for global store and starts
> >> > >> processing next event wherein the processor tries to read the global
> >> store
> >> > >> which may not have been synced with the topic?
> >> >
> >> > Correct. There is no guarantee when the update to the global store will
> >> > be applied. As said, global stores are not designed to hold data the
> >> > application computes.
> >> >
> >> >
> >> > -Matthias
> >> >
> >> >
> >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> >> > > thanks... will try with GlobalKTable.
> >> > > As a side question, I didn't really understand the significance of
> >> global
> >> > > state store which kind of works in a reverse way to local state store
> >> i.e.
> >> > > local state store is updated and then saved to changelog topic
> >> whereas in
> >> > > case of global state store the topic is updated first and then synced
> >> to
> >> > > global state store. Do these two work in sync i.e. the update to
> >> topic and
> >> > > global state store ?
> >> > >
> >> > > Say one stream thread updates the topic for global store and starts
> >> > > processing next event wherein the processor tries to read the global
> >> store
> >> > > which may not have been synced with the topic?
> >> > >
> >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >> > >
> >> > >> Yes.
> >> > >>
> >> > >> A `GlobalKTable` uses a global store internally.
> >> > >>
> >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> >> > >> `Topology.addGlobalStore()` to add a global store "manually".
> >> > >>
> >> > >>
> >> > >> -Matthias
> >> > >>
> >> > >>
> >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >> > >>> Thanks Matthias.
> >> > >>> Can you elaborate on the replicated caching layer part?
> >> > >>> When you say global stores, do you mean GlobalKTable created from a
> >> topic
> >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >> > >>>
> >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
> >> > >> wrote:
> >> > >>>
> >> > >>>> It's not possible to modify state store from "outside".
> >> > >>>>
> >> > >>>> If you want to build a "replicated caching layer", you could use
> >> global
> >> > >>>> stores and write into the corresponding topics to update all
> >> stores. Of
> >> > >>>> course, those updates would be async.
> >> > >>>>
> >> > >>>>
> >> > >>>> -Matthias
> >> > >>>>
> >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >> > >>>>> Hi All,
> >> > >>>>>
> >> > >>>>> I am wondering if this is possible: i have been asked to use state
> >> > >> stores
> >> > >>>>> as a general replicated cache among multiple instances of service
> >> > >>>> instances
> >> > >>>>> however the state store is created through streambuilder but is
> >> not
> >> > >>>>> actually modified through stream processor topology however it is
> >> to be
> >> > >>>>> modified from outside the stream topology. So, essentially, the
> >> state
> >> > >>>> store
> >> > >>>>> is just to be created from streambuilder and then to be used as an
> >> > >>>>> application level cache that will get replicated between
> >> application
> >> > >>>>> instances. Is this possible using state stores?
> >> > >>>>>
> >> > >>>>> Secondly, if possible, is this a good design approach?
> >> > >>>>>
> >> > >>>>> Appreciate your response since I don't know the internals of state
> >> > >>>> stores.
> >> > >>>>>
> >> > >>>>
> >> > >>>>
> >> > >>>
> >> > >>
> >> > >>
> >> > >
> >> >
> >> >
> >> > Attachments:
> >> > * signature.asc
> >>
> >
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
If i understand correctly, Kafka is not designed to provide replicated
caching mechanism wherein the updates to cache will be synchronous across
multiple cache instances.

On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pd...@gmail.com> wrote:

> Thanks John.
>
> Actually, this is a normal consumer-producer application wherein there are
> 2 consumers (admin consumer and main consumer) consuming messages from 2
> different topics.
> One of the consumers consumes messages from a admin topic and populates
> data in a cache e.g. lets say agent with agent id 10 for which the first
> name and last name is received is populated in cache. When the other
> consumer consumes message and it has agent id 10 then it reads the cache,
> appends the first name and last name and then sends enriched event to
> producer.
> In this case, each application instance consumes all the events from admin
> topic (unique consumer id) and keeps them in the cache in memory.
> Now the requirement is to persist the cache and make is shared between the
> application instances, so each instance would consume partitions of admin
> topic and write to admin cache.
>
> If we want to use kafka streams, the application is so much evolved that
> it is difficult to migrate to streams at this stage. Secondly, from past
> mail chains, streams also won't serve the requirement since local state
> stores would just hold the local state of admin data and the cache written
> by each instance won't be shared with other instances.
>
> Global state stores may help but again it requires writing to the topic
> which is then synced with the state stores in the instances and the
> instances may not be in sync with each.
> I am not sure if this would cause any inconsistencies since i don't know
> how the events would flow from source e.g. if admin data is consumed by one
> instance which then modified the topic but it is not yet synced to all the
> global state stores and the next event arrived on the main consumer on a
> different instance and it tried to read from store cache then it doesn't
> get the data, so the event passed on without enriched data.
> That's pretty much about the use case.
>
>
> On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org> wrote:
>
>> Hi Pushkar,
>>
>> I’ve been wondering if we should add writable tables to the Streams api.
>> Can you explain more about your use case and how it would integrate with
>> your application?
>>
>> Incidentally, this would also help us provide more concrete advice.
>>
>> Thanks!
>> John
>>
>> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
>> > Both stores sever a different purpose.
>> >
>> > Regular stores allow you to store state the application computes.
>> > Writing into the changelog is a fault-tolerance mechanism.
>> >
>> > Global store hold "axially" data that is provided from "outside" of the
>> > app. There is no changelog topic, but only the input topic (that is used
>> > to re-create the global state).
>> >
>> > Local stores are sharded and updates are "sync" as they don't need to be
>> > shared with anybody else.
>> >
>> > For global stores, as all instances need to be updated, updates are
>> > async (we don't know when which instance will update it's own global
>> > store replica).
>> >
>> > >> Say one stream thread updates the topic for global store and starts
>> > >> processing next event wherein the processor tries to read the global
>> store
>> > >> which may not have been synced with the topic?
>> >
>> > Correct. There is no guarantee when the update to the global store will
>> > be applied. As said, global stores are not designed to hold data the
>> > application computes.
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
>> > > thanks... will try with GlobalKTable.
>> > > As a side question, I didn't really understand the significance of
>> global
>> > > state store which kind of works in a reverse way to local state store
>> i.e.
>> > > local state store is updated and then saved to changelog topic
>> whereas in
>> > > case of global state store the topic is updated first and then synced
>> to
>> > > global state store. Do these two work in sync i.e. the update to
>> topic and
>> > > global state store ?
>> > >
>> > > Say one stream thread updates the topic for global store and starts
>> > > processing next event wherein the processor tries to read the global
>> store
>> > > which may not have been synced with the topic?
>> > >
>> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> > >
>> > >> Yes.
>> > >>
>> > >> A `GlobalKTable` uses a global store internally.
>> > >>
>> > >> You can also use `StreamsBuilder.addGlobalStore()` or
>> > >> `Topology.addGlobalStore()` to add a global store "manually".
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >>
>> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>> > >>> Thanks Matthias.
>> > >>> Can you elaborate on the replicated caching layer part?
>> > >>> When you say global stores, do you mean GlobalKTable created from a
>> topic
>> > >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>> > >>>
>> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
>> > >> wrote:
>> > >>>
>> > >>>> It's not possible to modify state store from "outside".
>> > >>>>
>> > >>>> If you want to build a "replicated caching layer", you could use
>> global
>> > >>>> stores and write into the corresponding topics to update all
>> stores. Of
>> > >>>> course, those updates would be async.
>> > >>>>
>> > >>>>
>> > >>>> -Matthias
>> > >>>>
>> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>> > >>>>> Hi All,
>> > >>>>>
>> > >>>>> I am wondering if this is possible: i have been asked to use state
>> > >> stores
>> > >>>>> as a general replicated cache among multiple instances of service
>> > >>>> instances
>> > >>>>> however the state store is created through streambuilder but is
>> not
>> > >>>>> actually modified through stream processor topology however it is
>> to be
>> > >>>>> modified from outside the stream topology. So, essentially, the
>> state
>> > >>>> store
>> > >>>>> is just to be created from streambuilder and then to be used as an
>> > >>>>> application level cache that will get replicated between
>> application
>> > >>>>> instances. Is this possible using state stores?
>> > >>>>>
>> > >>>>> Secondly, if possible, is this a good design approach?
>> > >>>>>
>> > >>>>> Appreciate your response since I don't know the internals of state
>> > >>>> stores.
>> > >>>>>
>> > >>>>
>> > >>>>
>> > >>>
>> > >>
>> > >>
>> > >
>> >
>> >
>> > Attachments:
>> > * signature.asc
>>
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks John.

Actually, this is a normal consumer-producer application wherein there are
2 consumers (admin consumer and main consumer) consuming messages from 2
different topics.
One of the consumers consumes messages from a admin topic and populates
data in a cache e.g. lets say agent with agent id 10 for which the first
name and last name is received is populated in cache. When the other
consumer consumes message and it has agent id 10 then it reads the cache,
appends the first name and last name and then sends enriched event to
producer.
In this case, each application instance consumes all the events from admin
topic (unique consumer id) and keeps them in the cache in memory.
Now the requirement is to persist the cache and make is shared between the
application instances, so each instance would consume partitions of admin
topic and write to admin cache.

If we want to use kafka streams, the application is so much evolved that it
is difficult to migrate to streams at this stage. Secondly, from past mail
chains, streams also won't serve the requirement since local state stores
would just hold the local state of admin data and the cache written by each
instance won't be shared with other instances.

Global state stores may help but again it requires writing to the topic
which is then synced with the state stores in the instances and the
instances may not be in sync with each.
I am not sure if this would cause any inconsistencies since i don't know
how the events would flow from source e.g. if admin data is consumed by one
instance which then modified the topic but it is not yet synced to all the
global state stores and the next event arrived on the main consumer on a
different instance and it tried to read from store cache then it doesn't
get the data, so the event passed on without enriched data.
That's pretty much about the use case.


On Sun, May 3, 2020 at 9:42 PM John Roesler <vv...@apache.org> wrote:

> Hi Pushkar,
>
> I’ve been wondering if we should add writable tables to the Streams api.
> Can you explain more about your use case and how it would integrate with
> your application?
>
> Incidentally, this would also help us provide more concrete advice.
>
> Thanks!
> John
>
> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> > Both stores sever a different purpose.
> >
> > Regular stores allow you to store state the application computes.
> > Writing into the changelog is a fault-tolerance mechanism.
> >
> > Global store hold "axially" data that is provided from "outside" of the
> > app. There is no changelog topic, but only the input topic (that is used
> > to re-create the global state).
> >
> > Local stores are sharded and updates are "sync" as they don't need to be
> > shared with anybody else.
> >
> > For global stores, as all instances need to be updated, updates are
> > async (we don't know when which instance will update it's own global
> > store replica).
> >
> > >> Say one stream thread updates the topic for global store and starts
> > >> processing next event wherein the processor tries to read the global
> store
> > >> which may not have been synced with the topic?
> >
> > Correct. There is no guarantee when the update to the global store will
> > be applied. As said, global stores are not designed to hold data the
> > application computes.
> >
> >
> > -Matthias
> >
> >
> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > > thanks... will try with GlobalKTable.
> > > As a side question, I didn't really understand the significance of
> global
> > > state store which kind of works in a reverse way to local state store
> i.e.
> > > local state store is updated and then saved to changelog topic whereas
> in
> > > case of global state store the topic is updated first and then synced
> to
> > > global state store. Do these two work in sync i.e. the update to topic
> and
> > > global state store ?
> > >
> > > Say one stream thread updates the topic for global store and starts
> > > processing next event wherein the processor tries to read the global
> store
> > > which may not have been synced with the topic?
> > >
> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> > >
> > >> Yes.
> > >>
> > >> A `GlobalKTable` uses a global store internally.
> > >>
> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> > >> `Topology.addGlobalStore()` to add a global store "manually".
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > >>> Thanks Matthias.
> > >>> Can you elaborate on the replicated caching layer part?
> > >>> When you say global stores, do you mean GlobalKTable created from a
> topic
> > >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> > >>>
> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
> > >> wrote:
> > >>>
> > >>>> It's not possible to modify state store from "outside".
> > >>>>
> > >>>> If you want to build a "replicated caching layer", you could use
> global
> > >>>> stores and write into the corresponding topics to update all
> stores. Of
> > >>>> course, those updates would be async.
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > >>>>> Hi All,
> > >>>>>
> > >>>>> I am wondering if this is possible: i have been asked to use state
> > >> stores
> > >>>>> as a general replicated cache among multiple instances of service
> > >>>> instances
> > >>>>> however the state store is created through streambuilder but is not
> > >>>>> actually modified through stream processor topology however it is
> to be
> > >>>>> modified from outside the stream topology. So, essentially, the
> state
> > >>>> store
> > >>>>> is just to be created from streambuilder and then to be used as an
> > >>>>> application level cache that will get replicated between
> application
> > >>>>> instances. Is this possible using state stores?
> > >>>>>
> > >>>>> Secondly, if possible, is this a good design approach?
> > >>>>>
> > >>>>> Appreciate your response since I don't know the internals of state
> > >>>> stores.
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
> > Attachments:
> > * signature.asc
>

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by John Roesler <vv...@apache.org>.
Hi Pushkar,

I’ve been wondering if we should add writable tables to the Streams api. Can you explain more about your use case and how it would integrate with your application?

Incidentally, this would also help us provide more concrete advice. 

Thanks!
John

On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> Both stores sever a different purpose.
> 
> Regular stores allow you to store state the application computes.
> Writing into the changelog is a fault-tolerance mechanism.
> 
> Global store hold "axially" data that is provided from "outside" of the
> app. There is no changelog topic, but only the input topic (that is used
> to re-create the global state).
> 
> Local stores are sharded and updates are "sync" as they don't need to be
> shared with anybody else.
> 
> For global stores, as all instances need to be updated, updates are
> async (we don't know when which instance will update it's own global
> store replica).
> 
> >> Say one stream thread updates the topic for global store and starts
> >> processing next event wherein the processor tries to read the global store
> >> which may not have been synced with the topic?
> 
> Correct. There is no guarantee when the update to the global store will
> be applied. As said, global stores are not designed to hold data the
> application computes.
> 
> 
> -Matthias
> 
> 
> On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > thanks... will try with GlobalKTable.
> > As a side question, I didn't really understand the significance of global
> > state store which kind of works in a reverse way to local state store i.e.
> > local state store is updated and then saved to changelog topic whereas in
> > case of global state store the topic is updated first and then synced to
> > global state store. Do these two work in sync i.e. the update to topic and
> > global state store ?
> > 
> > Say one stream thread updates the topic for global store and starts
> > processing next event wherein the processor tries to read the global store
> > which may not have been synced with the topic?
> > 
> > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> wrote:
> > 
> >> Yes.
> >>
> >> A `GlobalKTable` uses a global store internally.
> >>
> >> You can also use `StreamsBuilder.addGlobalStore()` or
> >> `Topology.addGlobalStore()` to add a global store "manually".
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >>> Thanks Matthias.
> >>> Can you elaborate on the replicated caching layer part?
> >>> When you say global stores, do you mean GlobalKTable created from a topic
> >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >>>
> >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>
> >>>> It's not possible to modify state store from "outside".
> >>>>
> >>>> If you want to build a "replicated caching layer", you could use global
> >>>> stores and write into the corresponding topics to update all stores. Of
> >>>> course, those updates would be async.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >>>>> Hi All,
> >>>>>
> >>>>> I am wondering if this is possible: i have been asked to use state
> >> stores
> >>>>> as a general replicated cache among multiple instances of service
> >>>> instances
> >>>>> however the state store is created through streambuilder but is not
> >>>>> actually modified through stream processor topology however it is to be
> >>>>> modified from outside the stream topology. So, essentially, the state
> >>>> store
> >>>>> is just to be created from streambuilder and then to be used as an
> >>>>> application level cache that will get replicated between application
> >>>>> instances. Is this possible using state stores?
> >>>>>
> >>>>> Secondly, if possible, is this a good design approach?
> >>>>>
> >>>>> Appreciate your response since I don't know the internals of state
> >>>> stores.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> > 
> 
> 
> Attachments:
> * signature.asc

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Posted by "Matthias J. Sax" <mj...@apache.org>.
Both stores sever a different purpose.

Regular stores allow you to store state the application computes.
Writing into the changelog is a fault-tolerance mechanism.

Global store hold "axially" data that is provided from "outside" of the
app. There is no changelog topic, but only the input topic (that is used
to re-create the global state).

Local stores are sharded and updates are "sync" as they don't need to be
shared with anybody else.

For global stores, as all instances need to be updated, updates are
async (we don't know when which instance will update it's own global
store replica).

>> Say one stream thread updates the topic for global store and starts
>> processing next event wherein the processor tries to read the global store
>> which may not have been synced with the topic?

Correct. There is no guarantee when the update to the global store will
be applied. As said, global stores are not designed to hold data the
application computes.


-Matthias


On 4/30/20 11:11 PM, Pushkar Deole wrote:
> thanks... will try with GlobalKTable.
> As a side question, I didn't really understand the significance of global
> state store which kind of works in a reverse way to local state store i.e.
> local state store is updated and then saved to changelog topic whereas in
> case of global state store the topic is updated first and then synced to
> global state store. Do these two work in sync i.e. the update to topic and
> global state store ?
> 
> Say one stream thread updates the topic for global store and starts
> processing next event wherein the processor tries to read the global store
> which may not have been synced with the topic?
> 
> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Yes.
>>
>> A `GlobalKTable` uses a global store internally.
>>
>> You can also use `StreamsBuilder.addGlobalStore()` or
>> `Topology.addGlobalStore()` to add a global store "manually".
>>
>>
>> -Matthias
>>
>>
>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>> Thanks Matthias.
>>> Can you elaborate on the replicated caching layer part?
>>> When you say global stores, do you mean GlobalKTable created from a topic
>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>>
>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> It's not possible to modify state store from "outside".
>>>>
>>>> If you want to build a "replicated caching layer", you could use global
>>>> stores and write into the corresponding topics to update all stores. Of
>>>> course, those updates would be async.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>>>> Hi All,
>>>>>
>>>>> I am wondering if this is possible: i have been asked to use state
>> stores
>>>>> as a general replicated cache among multiple instances of service
>>>> instances
>>>>> however the state store is created through streambuilder but is not
>>>>> actually modified through stream processor topology however it is to be
>>>>> modified from outside the stream topology. So, essentially, the state
>>>> store
>>>>> is just to be created from streambuilder and then to be used as an
>>>>> application level cache that will get replicated between application
>>>>> instances. Is this possible using state stores?
>>>>>
>>>>> Secondly, if possible, is this a good design approach?
>>>>>
>>>>> Appreciate your response since I don't know the internals of state
>>>> stores.
>>>>>
>>>>
>>>>
>>>
>>
>>
>