You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2020/06/24 03:05:06 UTC

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Hi Matthias,

I have configured the GlobalKTable to stream from a topic and application
is working fine, however during automated build test cases, sometimes I get
an exception: I believe this could be because of race between actual topic
creation and the service startup (since topic creation may not happen
immediately and the API returns future).
The question I have here: is there any property that I can set so the
streams application would perform some retries to establish the global
store before giving up with an exception?

","logger_name":"com.avaya.analytics.dsi.DsiMain","thread_name":"main","level":"DEBUG","level_value":10000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
There are no partitions available for topic analytics-group-cache when
initializing global store group-cache-store\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.topicPartitionsForStore(GlobalStateManagerImpl.java:265)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.register(GlobalStateManagerImpl.java:190)\n\tat
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:101)\n\tat
org.apache.kafka.streams.state.internals.InMemoryKeyValueStore.init(InMemoryKeyValueStore.java:58)\n\tat
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder$InMemoryTimestampedKeyValueStoreMarker.init(TimestampedKeyValueStoreBuilder.java:100)\n\tat
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)\n\tat
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)\n\tat
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)\n\tat
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:112)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:136)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.initialize(GlobalStreamThread.java:229)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStreamThread.initialize(GlobalStreamThread.java:345)\n\tat
org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:270)\n"}




On Thu, May 28, 2020 at 1:27 PM Pushkar Deole <pd...@gmail.com> wrote:

> Matthias,
>
> I realized that the exception and actual problem is totally different. The
> problem was the client was not set with SSL truststore while server is
> SSLenabled.
> I also found this open bug on kafka
> https://issues.apache.org/jira/browse/KAFKA-4493
> After setting the SSL properties on stream, I am able to get it up and
> running.
>
> Due to above problem, it is very difficult to debug the issue and above
> bug can be fixed as soon as possible, or a proper exception should be
> thrown.
>
> On Wed, May 27, 2020 at 10:59 PM Pushkar Deole <pd...@gmail.com>
> wrote:
>
>> Thanks... i will try increasing the memory in case you don't spot
>> anything wrong with the code. Other service also have streams and global k
>> table but they use spring-kafka, but i think that should not matter, and it
>> should work with normal kafka-streams code unless i am missing some
>> configuration/setting here
>>
>> On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>
>>> There is no hook. Only a restore listener, but this one is only used
>>> during startup when the global store is loaded. It's not sure during
>>> regular processing.
>>>
>>> Depending on your usage, maybe you can switch to a global store instead
>>> of GlobalKTable? That way, you can implement a custom `Processor` and
>>> add a hook manually?
>>>
>>> I don't see anything wrong with your setup. Unclear if/why the global
>>> store would require a lot of memory...
>>>
>>>
>>> -Matthias
>>>
>>> On 5/27/20 7:41 AM, Pushkar Deole wrote:
>>> > Matthias,
>>> > I tried with default store as well but getting same error, can you
>>> please
>>> > check if I am initializing the global store in the right way:
>>> >
>>> > public void setupGlobalCacheTables(String theKafkaServers) {
>>> >     Properties props = new Properties();
>>> >     props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> DEFAULT_APPLICATION_ID);
>>> >     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
>>> >     StreamsBuilder streamsBuilder = new StreamsBuilder();
>>> >     groupCacheTable =
>>> >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
>>> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
>>> >         Materialized.as(GROUP_CACHE_STORE_NAME));
>>> >     Topology groupCacheTopology = streamsBuilder.build();
>>> >      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
>>> >       kafkaStreams.start();
>>> >
>>> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>>> > LOG.info("Stopping the stream");
>>> > kafkaStreams.close();
>>> > }));
>>> > }
>>> >
>>> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pd...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi Matthias,
>>> >>
>>> >> By the way, I used the in-memory global store and the service is
>>> giving
>>> >> out of memory error during startup. Unfortunately i don't have a stack
>>> >> trace now but when i got stack the first time, the error was coming
>>> >> somewhere from memorypool.allocate or similar kind of method. If i
>>> get the
>>> >> stack trace again, I will share that with you.
>>> >> However, the topic from where the store is reading from is empty so I
>>> am
>>> >> not sure why the global k table is trying to occupy a lot of space.
>>> The POD
>>> >> memory request and limits are 500 MiB and 750 MiB respectively so the
>>> state
>>> >> store should fit into this memory I believe since topic is empty. Can
>>> you
>>> >> provide inputs on this.
>>> >>
>>> >>
>>> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pd...@gmail.com>
>>> >> wrote:
>>> >>
>>> >>> Ok... got it... is there any hook that I can attach to the global k
>>> table
>>> >>> or global store? What I mean here is I want to know when the global
>>> store
>>> >>> is updated with data from topic in that case the hook that I
>>> specified
>>> >>> should be invoked so i can do some activity like logging that, this
>>> will
>>> >>> allow me to know how long the global store took to sync up with
>>> topic after
>>> >>> the event has been put on the topic.
>>> >>>
>>> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>>> >>> wrote:
>>> >>>
>>> >>>> For example it could be some "static" information, like a mapping
>>> from
>>> >>>> zip code to city name.
>>> >>>>
>>> >>>> Something that does usually not change over time.
>>> >>>>
>>> >>>>
>>> >>>> -Matthias
>>> >>>>
>>> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>>> >>>>> Matthias,
>>> >>>>>
>>> >>>>> I am wondering what you mean by "Global store hold "axially" data
>>> that
>>> >>>> is
>>> >>>>> provided from "outside" of the
>>> >>>>> app"
>>> >>>>>
>>> >>>>> will you be able to give some example use case here as to what you
>>> >>>> mean by
>>> >>>>> axially data provided from outside app?
>>> >>>>>
>>> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>>> >>>> wrote:
>>> >>>>>
>>> >>>>>> Both stores sever a different purpose.
>>> >>>>>>
>>> >>>>>> Regular stores allow you to store state the application computes.
>>> >>>>>> Writing into the changelog is a fault-tolerance mechanism.
>>> >>>>>>
>>> >>>>>> Global store hold "axially" data that is provided from "outside"
>>> of
>>> >>>> the
>>> >>>>>> app. There is no changelog topic, but only the input topic (that
>>> is
>>> >>>> used
>>> >>>>>> to re-create the global state).
>>> >>>>>>
>>> >>>>>> Local stores are sharded and updates are "sync" as they don't
>>> need to
>>> >>>> be
>>> >>>>>> shared with anybody else.
>>> >>>>>>
>>> >>>>>> For global stores, as all instances need to be updated, updates
>>> are
>>> >>>>>> async (we don't know when which instance will update it's own
>>> global
>>> >>>>>> store replica).
>>> >>>>>>
>>> >>>>>>>> Say one stream thread updates the topic for global store and
>>> starts
>>> >>>>>>>> processing next event wherein the processor tries to read the
>>> global
>>> >>>>>> store
>>> >>>>>>>> which may not have been synced with the topic?
>>> >>>>>>
>>> >>>>>> Correct. There is no guarantee when the update to the global store
>>> >>>> will
>>> >>>>>> be applied. As said, global stores are not designed to hold data
>>> the
>>> >>>>>> application computes.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> -Matthias
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>>> >>>>>>> thanks... will try with GlobalKTable.
>>> >>>>>>> As a side question, I didn't really understand the significance
>>> of
>>> >>>> global
>>> >>>>>>> state store which kind of works in a reverse way to local state
>>> store
>>> >>>>>> i.e.
>>> >>>>>>> local state store is updated and then saved to changelog topic
>>> >>>> whereas in
>>> >>>>>>> case of global state store the topic is updated first and then
>>> >>>> synced to
>>> >>>>>>> global state store. Do these two work in sync i.e. the update to
>>> >>>> topic
>>> >>>>>> and
>>> >>>>>>> global state store ?
>>> >>>>>>>
>>> >>>>>>> Say one stream thread updates the topic for global store and
>>> starts
>>> >>>>>>> processing next event wherein the processor tries to read the
>>> global
>>> >>>>>> store
>>> >>>>>>> which may not have been synced with the topic?
>>> >>>>>>>
>>> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mjsax@apache.org
>>> >
>>> >>>> wrote:
>>> >>>>>>>
>>> >>>>>>>> Yes.
>>> >>>>>>>>
>>> >>>>>>>> A `GlobalKTable` uses a global store internally.
>>> >>>>>>>>
>>> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
>>> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> -Matthias
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>> >>>>>>>>> Thanks Matthias.
>>> >>>>>>>>> Can you elaborate on the replicated caching layer part?
>>> >>>>>>>>> When you say global stores, do you mean GlobalKTable created
>>> from a
>>> >>>>>> topic
>>> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>> >>>>>>>>>
>>> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
>>> mjsax@apache.org
>>> >>>>>
>>> >>>>>>>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>>> It's not possible to modify state store from "outside".
>>> >>>>>>>>>>
>>> >>>>>>>>>> If you want to build a "replicated caching layer", you could
>>> use
>>> >>>>>> global
>>> >>>>>>>>>> stores and write into the corresponding topics to update all
>>> >>>> stores.
>>> >>>>>> Of
>>> >>>>>>>>>> course, those updates would be async.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> -Matthias
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>> >>>>>>>>>>> Hi All,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use
>>> >>>> state
>>> >>>>>>>> stores
>>> >>>>>>>>>>> as a general replicated cache among multiple instances of
>>> service
>>> >>>>>>>>>> instances
>>> >>>>>>>>>>> however the state store is created through streambuilder but
>>> is
>>> >>>> not
>>> >>>>>>>>>>> actually modified through stream processor topology however
>>> it
>>> >>>> is to
>>> >>>>>> be
>>> >>>>>>>>>>> modified from outside the stream topology. So, essentially,
>>> the
>>> >>>> state
>>> >>>>>>>>>> store
>>> >>>>>>>>>>> is just to be created from streambuilder and then to be used
>>> as
>>> >>>> an
>>> >>>>>>>>>>> application level cache that will get replicated between
>>> >>>> application
>>> >>>>>>>>>>> instances. Is this possible using state stores?
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Secondly, if possible, is this a good design approach?
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Appreciate your response since I don't know the internals of
>>> >>>> state
>>> >>>>>>>>>> stores.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >
>>>
>>>