You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by John Roesler <vv...@apache.org> on 2022/04/01 01:59:07 UTC

Re: [DISCUSS] KIP-813 Shared State Stores

Hi Daan,

Thanks for the KIP!

I just got caught up on the discussion. I just have a some small questions, and then I will be ready to vote. 

1. Am I right I’m thinking that there’s no way to enforce the stores are actually read-only, right? It seems like the StoreBuilder interface is too generic for that. If that’s true, I think it’s fine, but we should be sure the JavaDoc clearly states that other processors must not write into these stores (except for the one that feeds it).

 2. Are you planning for these stores to get standbys as well? I would think so, otherwise the desired purpose of standbys (eliminating restoration latency during failover) would not be served. 

Thanks,
John

On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
> Thanks for updating the KIP. LGTM.
>
> I think we can start a vote.
>
>
>>  I think this might provide issues if your processor is doing a projection of the data.
>
> This is correct. It's a know issue: 
> https://issues.apache.org/jira/browse/KAFKA-7663
>
> Global-stores/KTables are designed to put the data into the store 
> _unmodified_.
>
>
> -Matthias
>
> On 2/28/22 5:05 AM, Daan Gertis wrote:
>> Updated the KIP to be more aligned with global state store function names.
>> 
>> If I remember correctly during restore the processor will not be used right? I think this might provide issues if your processor is doing a projection of the data. Either way, I would not add that into this KIP since it is a specific use-case pattern.
>> 
>> Unless there is anything more to add or change, I would propose moving to a vote?
>> 
>> Cheers!
>> D.
>> 
>> From: Matthias J. Sax <mj...@apache.org>
>> Date: Friday, 18 February 2022 at 03:29
>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>> Thanks for updating the KIP!
>> 
>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>> an "optional" parameter?
>> 
>> Also wondering if we need to pass in a `String sourceName` and `String
>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>> re-using the store name as currently proposed? -- In general I don't
>> have a strong opinion either way, but it seems to introduce some API
>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>> 
>> 
>>> Another thing we were confronted with was the restoring of state when the actual local storage is gone. For example, we host on K8s with ephemeral pods, so there is no persisted storage between pod restarts. However, the consumer group will be already been at the latest offset, preventing from previous data to be restored within the new pod’s statestore.
>> 
>> We have already code in-place in the runtime to do the right thing for
>> this case (ie, via DSL source-table changelog optimization). We can
>> re-use this part. It's nothing we need to discuss on the KIP, but we can
>> discuss on the PR later.
>> 
>> 
>> -Matthias
>> 
>> 
>> On 2/17/22 10:09 AM, Guozhang Wang wrote:
>>> Hi Daan,
>>>
>>> I think for the read-only state stores you'd need ot slightly augment the
>>> checkpointing logic so that it would still write the checkpointed offsets
>>> while restoring from the changelogs.
>>>
>>>
>>> Guozhang
>>>
>>> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dg...@korfinancial.com>
>>> wrote:
>>>
>>>>> Could you add more details about the signature of
>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>> and hard to read.
>>>>>
>>>>> The KIP mentions a `ProcessorProvider` -- do you mean
>>>> `ProcessorSupplier`?
>>>>>
>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>> timestamp synchronization enabled seems to be important?
>>>>
>>>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>>>> have allow for timestamp synchronization.
>>>>
>>>> Another thing we were confronted with was the restoring of state when the
>>>> actual local storage is gone. For example, we host on K8s with ephemeral
>>>> pods, so there is no persisted storage between pod restarts. However, the
>>>> consumer group will be already been at the latest offset, preventing from
>>>> previous data to be restored within the new pod’s statestore.
>>>>
>>>> If I remember correctly, there was some checkpoint logic available when
>>>> restoring, but we are bypassing that since logging is disabled on the
>>>> statestore, no?
>>>>
>>>> As always, thanks for your insights.
>>>>
>>>> Cheers,
>>>> D.
>>>>
>>>>
>>>> From: Matthias J. Sax <mj...@apache.org>
>>>> Date: Wednesday, 16 February 2022 at 02:09
>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>> Thanks for updating the KIP.
>>>>
>>>> Could you add more details about the signature of
>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>> overloads taking different parameters? The KIP only contains some verbal
>>>> description on the "Implementation Plan" section, that is hard to find
>>>> and hard to read.
>>>>
>>>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>>>
>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>> synchronization (similar to global state stores)? It seems to be an
>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>> timestamp synchronization enabled seems to be important?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 2/8/22 11:01 PM, Guozhang Wang wrote:
>>>>> Daan,
>>>>>
>>>>> Thanks for the replies, those make sense to me.
>>>>>
>>>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dg...@korfinancial.com>
>>>> wrote:
>>>>>
>>>>>> I just updated the KIP to reflect the things discussed in this thread.
>>>>>>
>>>>>> As for your questions Guozhang:
>>>>>>
>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>> read-only
>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>> store
>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>
>>>>>> Good question. Both need to be co-partitioned to have the data
>>>> available.
>>>>>> Another option would be to use IQ to make the request, but that seems
>>>> far
>>>>>> from ideal.
>>>>>>
>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>> app
>>>>>>> B's processing timelines, or just like what we do for global stores
>>>> that
>>>>>> we
>>>>>>> just update the read-only stores async?
>>>>>>
>>>>>> Pretty much the same as we do for global stores.
>>>>>>
>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>> what's
>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>> store?
>>>>>>
>>>>>> I think because of the first answer the behavior differs from global
>>>>>> stores.
>>>>>>
>>>>>> Makes sense?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> D.
>>>>>>
>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>> Date: Thursday, 20 January 2022 at 21:12
>>>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>> Any processor that would use that materialized, read-only statestore
>>>>>> would need to wait for the store to be restored. I can't find a way to
>>>> make
>>>>>> that possible since processors can't wait for the statestore to be
>>>> restored.
>>>>>>
>>>>>> This is built into the runtime already. Nothing to worry about. It's
>>>>>> part of the regular restore logic -- as long as any store is restoring,
>>>>>> all processing is blocked.
>>>>>>
>>>>>>> Also, since the statestore would have logging disabled, it means there
>>>>>> is no initial restoration going on.
>>>>>>
>>>>>> No. When we hookup the input topic as changelog (as the DSL does) we
>>>>>> restore from the input topic during regular restore phase. The restore
>>>>>> logic does not even know it's reading from the input topic, but not from
>>>>>> a "*-changelog" topic).
>>>>>>
>>>>>> Disabling changelogging does only affect the write path (ie,
>>>>>> `store.put()`) but not the restore path due to the internal "hookup" of
>>>>>> the input topic inside the restore logic.
>>>>>>
>>>>>> It's not easy to find/understand by reverse engineering I guess, but
>>>>>> it's there.
>>>>>>
>>>>>> One pointer where the actual hookup happens (might help to dig into it
>>>>>> more if you want):
>>>>>>
>>>>>>
>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 1/20/22 10:04 AM, Guozhang Wang wrote:
>>>>>>> Hello Daan,
>>>>>>>
>>>>>>> Thanks for writing the KIP. I just read through it and just my 2c here:
>>>>>> to
>>>>>>> me it seems that one of the goal would be to "externalize" the internal
>>>>>>> changelog topic of an application (say A) so that other consumers can
>>>>>>> directly read them --- though technically without any auth, anyone
>>>>>> knowing
>>>>>>> the topic name would be able to write to it too, conceptually we would
>>>>>> just
>>>>>>> assume that app A is the only writer of that topic --- The question I
>>>> had
>>>>>>> is how much we want to externalize the topic. For example we can,
>>>>>>> orthogonally to this KIP, just allow users to pass in a customized
>>>> topic
>>>>>>> name when constructing a state store, indicating the application A to
>>>> use
>>>>>>> that as the changelog, and since that topic is created outside of A and
>>>>>> is
>>>>>>> publicly visible to anyone else on that cluster, anyone --- including
>>>> any
>>>>>>> consumers, or streams apps. This is probably most flexible as for
>>>>>> sharing,
>>>>>>> but we are even less assured that if application A is the only writer
>>>> to
>>>>>>> that external topic unless we have explicit auth for A on that topic.
>>>>>>>
>>>>>>> Aside of that, here are a few more detailed comments about the
>>>>>>> implementation design itself following your current proposal:
>>>>>>>
>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>> read-only
>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>> store
>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>> app
>>>>>>> B's processing timelines, or just like what we do for global stores
>>>> that
>>>>>> we
>>>>>>> just update the read-only stores async?
>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>> what's
>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>> store?
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dg...@korfinancial.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Matthias,
>>>>>>>>
>>>>>>>> Thank you for that feedback, certainly some things to think about. Let
>>>>>> me
>>>>>>>> add my thoughts:
>>>>>>>>
>>>>>>>> +1 on simplifying the motivation. Was aiming to add more context but I
>>>>>>>> think you're right, bringing it back to the essence makes more sense.
>>>>>>>>
>>>>>>>> I also follow the reasoning of not having leader and follower. Makes
>>>>>> sense
>>>>>>>> to view it from a single app point of view.
>>>>>>>>
>>>>>>>> As for the API method and its parameters, I wanted to stay close to
>>>> the
>>>>>>>> API for adding a regular statestore, but I can perfectly find myself
>>>> in
>>>>>>>> defining an addReadOnlyStateStore() method instead.
>>>>>>>>
>>>>>>>> I agree the processor approach would be the most flexible one, and
>>>>>> surely
>>>>>>>> it allows you to use a processor to base the statestore off an
>>>> existing
>>>>>>>> topic. From what I understood from the codebase, there might be a
>>>>>> problem
>>>>>>>> when using that statestore. Any processor that would use that
>>>>>> materialized,
>>>>>>>> read-only statestore would need to wait for the store to be restored.
>>>> I
>>>>>>>> can't find a way to make that possible since processors can't wait for
>>>>>> the
>>>>>>>> statestore to be restored. Also, since the statestore would have
>>>> logging
>>>>>>>> disabled, it means there is no initial restoration going on. As you
>>>>>> wrote,
>>>>>>>> the DSL is already doing this, so I'm pretty sure I'm missing
>>>> something,
>>>>>>>> just unable to find what exactly.
>>>>>>>>
>>>>>>>> I will rewrite the parts in the KIP to make processor-based the
>>>>>> preferred
>>>>>>>> choice, along with the changes to the motivation etc. Only thing to
>>>>>> figure
>>>>>>>> out is that restoring behavior to be sure processors of the readonly
>>>>>>>> statestore aren't working with stale data.
>>>>>>>>
>>>>>>>> D.
>>>>>>>>
>>>>>>>> -----Original Message-----
>>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>>> Sent: 19 January 2022 21:31
>>>>>>>> To: dev@kafka.apache.org
>>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>>
>>>>>>>> Daan,
>>>>>>>>
>>>>>>>> thanks for the KIP. I personally find the motivation section a little
>>>>>> bit
>>>>>>>> confusing. If I understand the KIP correctly, you want to read a topic
>>>>>> into
>>>>>>>> a state store (ie, materialize it). This is already possible today.
>>>>>>>>
>>>>>>>> Of course, today a "second" changelog topic would be created. It seems
>>>>>> the
>>>>>>>> KIP aims to avoid the additional changelog topic, and to allow to
>>>> re-use
>>>>>>>> the original input topic (this optimization is already available for
>>>> the
>>>>>>>> DSL, but not for the PAPI).
>>>>>>>>
>>>>>>>> If my observation is correct, we can simplify the motivation
>>>> accordingly
>>>>>>>> (the fact that you want to use this feature to share state across
>>>>>> different
>>>>>>>> applications more efficiently seems to be secondary and we could omit
>>>> it
>>>>>>>> IMHO to keep the motivation focused).
>>>>>>>>
>>>>>>>> As a result, we also don't need to concept of "leader" and "follower".
>>>>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns
>>>>>> across
>>>>>>>> different apps, but we can only guarantee stuff within a single
>>>>>> application
>>>>>>>> (ie, don't create a changelog but reuse an input topic as changelog).
>>>> It
>>>>>>>> would simplify the KIP if we remove these parts.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> For the API, I am wondering why you propose to pass in
>>>> `processorNames`?
>>>>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
>>>>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor`
>>>>>> must
>>>>>>>> implement a certain pattern, ie, take each input record an apply it
>>>>>>>> unmodified to the state store (ie, the Processor will be solely
>>>>>> responsible
>>>>>>>> to maintain the state store). We might also need to pass in other
>>>>>> argument
>>>>>>>> similar to `addGlobalStore` into this method). (More below.)
>>>>>>>>
>>>>>>>> If other processors need to read the state store, they can be
>>>> connected
>>>>>> to
>>>>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
>>>>>>>> approach to keep `processorName` would also be possible, but IMHO all
>>>>>> those
>>>>>>>> should only _read_ the state store (but not modify it), to keep a
>>>> clear
>>>>>>>> conceptual separation.
>>>>>>>>
>>>>>>>> About the method name: wondering if we should use a different name to
>>>> be
>>>>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Btw: please omit any code snippets and only put the newly added method
>>>>>>>> signature in the KIP.
>>>>>>>>
>>>>>>>> What I don't yet understand is the section "Allow state stores to
>>>>>>>> continue listening for changes from their changelog". Can you
>>>> elaborate?
>>>>>>>>
>>>>>>>> About:
>>>>>>>>
>>>>>>>>> Since a changelog topic is created with the application id in it’s
>>>>>> name,
>>>>>>>> it would allow us to check in the follower if the changelog topic
>>>> starts
>>>>>>>> with our application id. If it doesn’t, we are not allowed to send a
>>>>>> log.
>>>>>>>>
>>>>>>>> The DSL implements this differently, and just disabled the changelog
>>>> for
>>>>>>>> the state store (ie, for the "follower"). We could do the same thing
>>>>>>>> (either enforcing that the provided `StoreBuilder` has changelogging
>>>>>>>> disabled, or by just ignoring it and disabled it hard coded).
>>>>>>>>
>>>>>>>>
>>>>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put
>>>>>>>> into rejected alternatives. Note that the problem you call out, namely
>>>>>>>>
>>>>>>>>> Problem with this approach is the lack of having restoring support
>>>>>>>> within the state store
>>>>>>>>
>>>>>>>> does not apply. A restore it absolutely possible and the DSL already
>>>>>>>> supports it.
>>>>>>>>
>>>>>>>>
>>>>>>>> Or is your concern with regard to performance? The "source-processor
>>>>>>>> approach" would have the disadvantage that input data is first
>>>>>>>> deserialized, fed into the Processor, and than serialized again when
>>>> put
>>>>>>>> into the state store. Re-using the state restore code is a good idea
>>>>>>>> from a performance point of view, but it might require quite some
>>>>>>>> internal changes (your proposal to "not stop restoring" might not work
>>>>>>>> as it could trigger quite some undesired side-effects given the
>>>> current
>>>>>>>> architecture of Kafka Streams).
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote:
>>>>>>>>> Hey everyone,
>>>>>>>>>
>>>>>>>>> Just created a KIP on sharing statestore state across multiple
>>>>>>>> applications without duplicating the data on multiple changelog
>>>> topics.
>>>>>>>> Have a look and tell me what you think or what to improve. This is my
>>>>>> first
>>>>>>>> one, so please be gentle 😉
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/x/q53kCw
>>>>>>>>>
>>>>>>>>> Cheers!
>>>>>>>>> D.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>

Re: [DISCUSS] KIP-813 Shared State Stores

Posted by Daan Gertis <dg...@korfinancial.com>.
Hey everyone!

Thank you for participating.

The KIP-813 vote has passed with:

binding +1s (John, Matthias, Bill)
non-binding +1s (Daan, Federico)

Cheers,
D.


From: John Roesler <vv...@apache.org>
Date: Friday, 1 April 2022 at 15:54
To: dev@kafka.apache.org <de...@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for the replies, Daan,

That all sounds good to me. I think standbys will probably come naturally, but we should make sure the implementation includes an integration test to make sure. Anyway, I just wanted to make sure we were on the same page.

Thanks again,
John

On Fri, Apr 1, 2022, at 08:16, Daan Gertis wrote:
> Hey John,
>
>
>   *   1. Am I right I’m thinking that there’s no way to enforce the
> stores are actually read-only, right? It seems like the StoreBuilder
> interface is too generic for that. If that’s true, I think it’s fine,
> but we should be sure the JavaDoc clearly states that other processors
> must not write into these stores (except for the one that feeds it).
>
> Yeah I couldn’t really find a way to limit it easily. We might be able
> to throw unsupported exceptions by wrapping the statestore, but that
> seems kind of brittle to do and feels a bit like a hack.
>
> Also, the function name clearly states it should be considered readonly.
>
>
>   *    2. Are you planning for these stores to get standbys as well? I
> would think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
>
> Yeah I think standbys should be applicable here as well. But we get
> that by implementing these readonly statestores as regular ones right?
>
> Cheers,
> D.
>
>
> From: John Roesler <vv...@apache.org>
> Date: Friday, 1 April 2022 at 04:01
> To: dev@kafka.apache.org <de...@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> Hi Daan,
>
> Thanks for the KIP!
>
> I just got caught up on the discussion. I just have a some small
> questions, and then I will be ready to vote.
>
> 1. Am I right I’m thinking that there’s no way to enforce the stores
> are actually read-only, right? It seems like the StoreBuilder interface
> is too generic for that. If that’s true, I think it’s fine, but we
> should be sure the JavaDoc clearly states that other processors must
> not write into these stores (except for the one that feeds it).
>
>  2. Are you planning for these stores to get standbys as well? I would
> think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
>
> Thanks,
> John
>
> On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
>> Thanks for updating the KIP. LGTM.
>>
>> I think we can start a vote.
>>
>>
>>>  I think this might provide issues if your processor is doing a projection of the data.
>>
>> This is correct. It's a know issue:
>> https://issues.apache.org/jira/browse/KAFKA-7663
>>
>> Global-stores/KTables are designed to put the data into the store
>> _unmodified_.
>>
>>
>> -Matthias
>>
>> On 2/28/22 5:05 AM, Daan Gertis wrote:
>>> Updated the KIP to be more aligned with global state store function names.
>>>
>>> If I remember correctly during restore the processor will not be used right? I think this might provide issues if your processor is doing a projection of the data. Either way, I would not add that into this KIP since it is a specific use-case pattern.
>>>
>>> Unless there is anything more to add or change, I would propose moving to a vote?
>>>
>>> Cheers!
>>> D.
>>>
>>> From: Matthias J. Sax <mj...@apache.org>
>>> Date: Friday, 18 February 2022 at 03:29
>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>> Thanks for updating the KIP!
>>>
>>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>>> an "optional" parameter?
>>>
>>> Also wondering if we need to pass in a `String sourceName` and `String
>>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>>> re-using the store name as currently proposed? -- In general I don't
>>> have a strong opinion either way, but it seems to introduce some API
>>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>>>
>>>
>>>> Another thing we were confronted with was the restoring of state when the actual local storage is gone. For example, we host on K8s with ephemeral pods, so there is no persisted storage between pod restarts. However, the consumer group will be already been at the latest offset, preventing from previous data to be restored within the new pod’s statestore.
>>>
>>> We have already code in-place in the runtime to do the right thing for
>>> this case (ie, via DSL source-table changelog optimization). We can
>>> re-use this part. It's nothing we need to discuss on the KIP, but we can
>>> discuss on the PR later.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 2/17/22 10:09 AM, Guozhang Wang wrote:
>>>> Hi Daan,
>>>>
>>>> I think for the read-only state stores you'd need ot slightly augment the
>>>> checkpointing logic so that it would still write the checkpointed offsets
>>>> while restoring from the changelogs.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dg...@korfinancial.com>
>>>> wrote:
>>>>
>>>>>> Could you add more details about the signature of
>>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>>> and hard to read.
>>>>>>
>>>>>> The KIP mentions a `ProcessorProvider` -- do you mean
>>>>> `ProcessorSupplier`?
>>>>>>
>>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>>> timestamp synchronization enabled seems to be important?
>>>>>
>>>>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>>>>> have allow for timestamp synchronization.
>>>>>
>>>>> Another thing we were confronted with was the restoring of state when the
>>>>> actual local storage is gone. For example, we host on K8s with ephemeral
>>>>> pods, so there is no persisted storage between pod restarts. However, the
>>>>> consumer group will be already been at the latest offset, preventing from
>>>>> previous data to be restored within the new pod’s statestore.
>>>>>
>>>>> If I remember correctly, there was some checkpoint logic available when
>>>>> restoring, but we are bypassing that since logging is disabled on the
>>>>> statestore, no?
>>>>>
>>>>> As always, thanks for your insights.
>>>>>
>>>>> Cheers,
>>>>> D.
>>>>>
>>>>>
>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>> Date: Wednesday, 16 February 2022 at 02:09
>>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>> Thanks for updating the KIP.
>>>>>
>>>>> Could you add more details about the signature of
>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>> and hard to read.
>>>>>
>>>>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>>>>
>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>> timestamp synchronization enabled seems to be important?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 2/8/22 11:01 PM, Guozhang Wang wrote:
>>>>>> Daan,
>>>>>>
>>>>>> Thanks for the replies, those make sense to me.
>>>>>>
>>>>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dg...@korfinancial.com>
>>>>> wrote:
>>>>>>
>>>>>>> I just updated the KIP to reflect the things discussed in this thread.
>>>>>>>
>>>>>>> As for your questions Guozhang:
>>>>>>>
>>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>>> read-only
>>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>>> store
>>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>>
>>>>>>> Good question. Both need to be co-partitioned to have the data
>>>>> available.
>>>>>>> Another option would be to use IQ to make the request, but that seems
>>>>> far
>>>>>>> from ideal.
>>>>>>>
>>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>>> app
>>>>>>>> B's processing timelines, or just like what we do for global stores
>>>>> that
>>>>>>> we
>>>>>>>> just update the read-only stores async?
>>>>>>>
>>>>>>> Pretty much the same as we do for global stores.
>>>>>>>
>>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>>> what's
>>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>>> store?
>>>>>>>
>>>>>>> I think because of the first answer the behavior differs from global
>>>>>>> stores.
>>>>>>>
>>>>>>> Makes sense?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> D.
>>>>>>>
>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>> Date: Thursday, 20 January 2022 at 21:12
>>>>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>> Any processor that would use that materialized, read-only statestore
>>>>>>> would need to wait for the store to be restored. I can't find a way to
>>>>> make
>>>>>>> that possible since processors can't wait for the statestore to be
>>>>> restored.
>>>>>>>
>>>>>>> This is built into the runtime already. Nothing to worry about. It's
>>>>>>> part of the regular restore logic -- as long as any store is restoring,
>>>>>>> all processing is blocked.
>>>>>>>
>>>>>>>> Also, since the statestore would have logging disabled, it means there
>>>>>>> is no initial restoration going on.
>>>>>>>
>>>>>>> No. When we hookup the input topic as changelog (as the DSL does) we
>>>>>>> restore from the input topic during regular restore phase. The restore
>>>>>>> logic does not even know it's reading from the input topic, but not from
>>>>>>> a "*-changelog" topic).
>>>>>>>
>>>>>>> Disabling changelogging does only affect the write path (ie,
>>>>>>> `store.put()`) but not the restore path due to the internal "hookup" of
>>>>>>> the input topic inside the restore logic.
>>>>>>>
>>>>>>> It's not easy to find/understand by reverse engineering I guess, but
>>>>>>> it's there.
>>>>>>>
>>>>>>> One pointer where the actual hookup happens (might help to dig into it
>>>>>>> more if you want):
>>>>>>>
>>>>>>>
>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 1/20/22 10:04 AM, Guozhang Wang wrote:
>>>>>>>> Hello Daan,
>>>>>>>>
>>>>>>>> Thanks for writing the KIP. I just read through it and just my 2c here:
>>>>>>> to
>>>>>>>> me it seems that one of the goal would be to "externalize" the internal
>>>>>>>> changelog topic of an application (say A) so that other consumers can
>>>>>>>> directly read them --- though technically without any auth, anyone
>>>>>>> knowing
>>>>>>>> the topic name would be able to write to it too, conceptually we would
>>>>>>> just
>>>>>>>> assume that app A is the only writer of that topic --- The question I
>>>>> had
>>>>>>>> is how much we want to externalize the topic. For example we can,
>>>>>>>> orthogonally to this KIP, just allow users to pass in a customized
>>>>> topic
>>>>>>>> name when constructing a state store, indicating the application A to
>>>>> use
>>>>>>>> that as the changelog, and since that topic is created outside of A and
>>>>>>> is
>>>>>>>> publicly visible to anyone else on that cluster, anyone --- including
>>>>> any
>>>>>>>> consumers, or streams apps. This is probably most flexible as for
>>>>>>> sharing,
>>>>>>>> but we are even less assured that if application A is the only writer
>>>>> to
>>>>>>>> that external topic unless we have explicit auth for A on that topic.
>>>>>>>>
>>>>>>>> Aside of that, here are a few more detailed comments about the
>>>>>>>> implementation design itself following your current proposal:
>>>>>>>>
>>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>>> read-only
>>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>>> store
>>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>>> app
>>>>>>>> B's processing timelines, or just like what we do for global stores
>>>>> that
>>>>>>> we
>>>>>>>> just update the read-only stores async?
>>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>>> what's
>>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>>> store?
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dg...@korfinancial.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Matthias,
>>>>>>>>>
>>>>>>>>> Thank you for that feedback, certainly some things to think about. Let
>>>>>>> me
>>>>>>>>> add my thoughts:
>>>>>>>>>
>>>>>>>>> +1 on simplifying the motivation. Was aiming to add more context but I
>>>>>>>>> think you're right, bringing it back to the essence makes more sense.
>>>>>>>>>
>>>>>>>>> I also follow the reasoning of not having leader and follower. Makes
>>>>>>> sense
>>>>>>>>> to view it from a single app point of view.
>>>>>>>>>
>>>>>>>>> As for the API method and its parameters, I wanted to stay close to
>>>>> the
>>>>>>>>> API for adding a regular statestore, but I can perfectly find myself
>>>>> in
>>>>>>>>> defining an addReadOnlyStateStore() method instead.
>>>>>>>>>
>>>>>>>>> I agree the processor approach would be the most flexible one, and
>>>>>>> surely
>>>>>>>>> it allows you to use a processor to base the statestore off an
>>>>> existing
>>>>>>>>> topic. From what I understood from the codebase, there might be a
>>>>>>> problem
>>>>>>>>> when using that statestore. Any processor that would use that
>>>>>>> materialized,
>>>>>>>>> read-only statestore would need to wait for the store to be restored.
>>>>> I
>>>>>>>>> can't find a way to make that possible since processors can't wait for
>>>>>>> the
>>>>>>>>> statestore to be restored. Also, since the statestore would have
>>>>> logging
>>>>>>>>> disabled, it means there is no initial restoration going on. As you
>>>>>>> wrote,
>>>>>>>>> the DSL is already doing this, so I'm pretty sure I'm missing
>>>>> something,
>>>>>>>>> just unable to find what exactly.
>>>>>>>>>
>>>>>>>>> I will rewrite the parts in the KIP to make processor-based the
>>>>>>> preferred
>>>>>>>>> choice, along with the changes to the motivation etc. Only thing to
>>>>>>> figure
>>>>>>>>> out is that restoring behavior to be sure processors of the readonly
>>>>>>>>> statestore aren't working with stale data.
>>>>>>>>>
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>> -----Original Message-----
>>>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>>>> Sent: 19 January 2022 21:31
>>>>>>>>> To: dev@kafka.apache.org
>>>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>>>
>>>>>>>>> Daan,
>>>>>>>>>
>>>>>>>>> thanks for the KIP. I personally find the motivation section a little
>>>>>>> bit
>>>>>>>>> confusing. If I understand the KIP correctly, you want to read a topic
>>>>>>> into
>>>>>>>>> a state store (ie, materialize it). This is already possible today.
>>>>>>>>>
>>>>>>>>> Of course, today a "second" changelog topic would be created. It seems
>>>>>>> the
>>>>>>>>> KIP aims to avoid the additional changelog topic, and to allow to
>>>>> re-use
>>>>>>>>> the original input topic (this optimization is already available for
>>>>> the
>>>>>>>>> DSL, but not for the PAPI).
>>>>>>>>>
>>>>>>>>> If my observation is correct, we can simplify the motivation
>>>>> accordingly
>>>>>>>>> (the fact that you want to use this feature to share state across
>>>>>>> different
>>>>>>>>> applications more efficiently seems to be secondary and we could omit
>>>>> it
>>>>>>>>> IMHO to keep the motivation focused).
>>>>>>>>>
>>>>>>>>> As a result, we also don't need to concept of "leader" and "follower".
>>>>>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns
>>>>>>> across
>>>>>>>>> different apps, but we can only guarantee stuff within a single
>>>>>>> application
>>>>>>>>> (ie, don't create a changelog but reuse an input topic as changelog).
>>>>> It
>>>>>>>>> would simplify the KIP if we remove these parts.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> For the API, I am wondering why you propose to pass in
>>>>> `processorNames`?
>>>>>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
>>>>>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor`
>>>>>>> must
>>>>>>>>> implement a certain pattern, ie, take each input record an apply it
>>>>>>>>> unmodified to the state store (ie, the Processor will be solely
>>>>>>> responsible
>>>>>>>>> to maintain the state store). We might also need to pass in other
>>>>>>> argument
>>>>>>>>> similar to `addGlobalStore` into this method). (More below.)
>>>>>>>>>
>>>>>>>>> If other processors need to read the state store, they can be
>>>>> connected
>>>>>>> to
>>>>>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
>>>>>>>>> approach to keep `processorName` would also be possible, but IMHO all
>>>>>>> those
>>>>>>>>> should only _read_ the state store (but not modify it), to keep a
>>>>> clear
>>>>>>>>> conceptual separation.
>>>>>>>>>
>>>>>>>>> About the method name: wondering if we should use a different name to
>>>>> be
>>>>>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Btw: please omit any code snippets and only put the newly added method
>>>>>>>>> signature in the KIP.
>>>>>>>>>
>>>>>>>>> What I don't yet understand is the section "Allow state stores to
>>>>>>>>> continue listening for changes from their changelog". Can you
>>>>> elaborate?
>>>>>>>>>
>>>>>>>>> About:
>>>>>>>>>
>>>>>>>>>> Since a changelog topic is created with the application id in it’s
>>>>>>> name,
>>>>>>>>> it would allow us to check in the follower if the changelog topic
>>>>> starts
>>>>>>>>> with our application id. If it doesn’t, we are not allowed to send a
>>>>>>> log.
>>>>>>>>>
>>>>>>>>> The DSL implements this differently, and just disabled the changelog
>>>>> for
>>>>>>>>> the state store (ie, for the "follower"). We could do the same thing
>>>>>>>>> (either enforcing that the provided `StoreBuilder` has changelogging
>>>>>>>>> disabled, or by just ignoring it and disabled it hard coded).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put
>>>>>>>>> into rejected alternatives. Note that the problem you call out, namely
>>>>>>>>>
>>>>>>>>>> Problem with this approach is the lack of having restoring support
>>>>>>>>> within the state store
>>>>>>>>>
>>>>>>>>> does not apply. A restore it absolutely possible and the DSL already
>>>>>>>>> supports it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Or is your concern with regard to performance? The "source-processor
>>>>>>>>> approach" would have the disadvantage that input data is first
>>>>>>>>> deserialized, fed into the Processor, and than serialized again when
>>>>> put
>>>>>>>>> into the state store. Re-using the state restore code is a good idea
>>>>>>>>> from a performance point of view, but it might require quite some
>>>>>>>>> internal changes (your proposal to "not stop restoring" might not work
>>>>>>>>> as it could trigger quite some undesired side-effects given the
>>>>> current
>>>>>>>>> architecture of Kafka Streams).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote:
>>>>>>>>>> Hey everyone,
>>>>>>>>>>
>>>>>>>>>> Just created a KIP on sharing statestore state across multiple
>>>>>>>>> applications without duplicating the data on multiple changelog
>>>>> topics.
>>>>>>>>> Have a look and tell me what you think or what to improve. This is my
>>>>>>> first
>>>>>>>>> one, so please be gentle ??
>>>>>>>>>>
>>>>>>>>>> https://cwiki.apache.org/confluence/x/q53kCw
>>>>>>>>>>
>>>>>>>>>> Cheers!
>>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>

Re: [DISCUSS] KIP-813 Shared State Stores

Posted by John Roesler <vv...@apache.org>.
Thanks for the replies, Daan,

That all sounds good to me. I think standbys will probably come naturally, but we should make sure the implementation includes an integration test to make sure. Anyway, I just wanted to make sure we were on the same page. 

Thanks again,
John

On Fri, Apr 1, 2022, at 08:16, Daan Gertis wrote:
> Hey John,
>
>
>   *   1. Am I right I’m thinking that there’s no way to enforce the 
> stores are actually read-only, right? It seems like the StoreBuilder 
> interface is too generic for that. If that’s true, I think it’s fine, 
> but we should be sure the JavaDoc clearly states that other processors 
> must not write into these stores (except for the one that feeds it).
>
> Yeah I couldn’t really find a way to limit it easily. We might be able 
> to throw unsupported exceptions by wrapping the statestore, but that 
> seems kind of brittle to do and feels a bit like a hack.
>
> Also, the function name clearly states it should be considered readonly.
>
>
>   *    2. Are you planning for these stores to get standbys as well? I 
> would think so, otherwise the desired purpose of standbys (eliminating 
> restoration latency during failover) would not be served.
>
> Yeah I think standbys should be applicable here as well. But we get 
> that by implementing these readonly statestores as regular ones right?
>
> Cheers,
> D.
>
>
> From: John Roesler <vv...@apache.org>
> Date: Friday, 1 April 2022 at 04:01
> To: dev@kafka.apache.org <de...@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> Hi Daan,
>
> Thanks for the KIP!
>
> I just got caught up on the discussion. I just have a some small 
> questions, and then I will be ready to vote.
>
> 1. Am I right I’m thinking that there’s no way to enforce the stores 
> are actually read-only, right? It seems like the StoreBuilder interface 
> is too generic for that. If that’s true, I think it’s fine, but we 
> should be sure the JavaDoc clearly states that other processors must 
> not write into these stores (except for the one that feeds it).
>
>  2. Are you planning for these stores to get standbys as well? I would 
> think so, otherwise the desired purpose of standbys (eliminating 
> restoration latency during failover) would not be served.
>
> Thanks,
> John
>
> On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
>> Thanks for updating the KIP. LGTM.
>>
>> I think we can start a vote.
>>
>>
>>>  I think this might provide issues if your processor is doing a projection of the data.
>>
>> This is correct. It's a know issue:
>> https://issues.apache.org/jira/browse/KAFKA-7663
>>
>> Global-stores/KTables are designed to put the data into the store
>> _unmodified_.
>>
>>
>> -Matthias
>>
>> On 2/28/22 5:05 AM, Daan Gertis wrote:
>>> Updated the KIP to be more aligned with global state store function names.
>>>
>>> If I remember correctly during restore the processor will not be used right? I think this might provide issues if your processor is doing a projection of the data. Either way, I would not add that into this KIP since it is a specific use-case pattern.
>>>
>>> Unless there is anything more to add or change, I would propose moving to a vote?
>>>
>>> Cheers!
>>> D.
>>>
>>> From: Matthias J. Sax <mj...@apache.org>
>>> Date: Friday, 18 February 2022 at 03:29
>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>> Thanks for updating the KIP!
>>>
>>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>>> an "optional" parameter?
>>>
>>> Also wondering if we need to pass in a `String sourceName` and `String
>>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>>> re-using the store name as currently proposed? -- In general I don't
>>> have a strong opinion either way, but it seems to introduce some API
>>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>>>
>>>
>>>> Another thing we were confronted with was the restoring of state when the actual local storage is gone. For example, we host on K8s with ephemeral pods, so there is no persisted storage between pod restarts. However, the consumer group will be already been at the latest offset, preventing from previous data to be restored within the new pod’s statestore.
>>>
>>> We have already code in-place in the runtime to do the right thing for
>>> this case (ie, via DSL source-table changelog optimization). We can
>>> re-use this part. It's nothing we need to discuss on the KIP, but we can
>>> discuss on the PR later.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 2/17/22 10:09 AM, Guozhang Wang wrote:
>>>> Hi Daan,
>>>>
>>>> I think for the read-only state stores you'd need ot slightly augment the
>>>> checkpointing logic so that it would still write the checkpointed offsets
>>>> while restoring from the changelogs.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dg...@korfinancial.com>
>>>> wrote:
>>>>
>>>>>> Could you add more details about the signature of
>>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>>> and hard to read.
>>>>>>
>>>>>> The KIP mentions a `ProcessorProvider` -- do you mean
>>>>> `ProcessorSupplier`?
>>>>>>
>>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>>> timestamp synchronization enabled seems to be important?
>>>>>
>>>>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>>>>> have allow for timestamp synchronization.
>>>>>
>>>>> Another thing we were confronted with was the restoring of state when the
>>>>> actual local storage is gone. For example, we host on K8s with ephemeral
>>>>> pods, so there is no persisted storage between pod restarts. However, the
>>>>> consumer group will be already been at the latest offset, preventing from
>>>>> previous data to be restored within the new pod’s statestore.
>>>>>
>>>>> If I remember correctly, there was some checkpoint logic available when
>>>>> restoring, but we are bypassing that since logging is disabled on the
>>>>> statestore, no?
>>>>>
>>>>> As always, thanks for your insights.
>>>>>
>>>>> Cheers,
>>>>> D.
>>>>>
>>>>>
>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>> Date: Wednesday, 16 February 2022 at 02:09
>>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>> Thanks for updating the KIP.
>>>>>
>>>>> Could you add more details about the signature of
>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>> and hard to read.
>>>>>
>>>>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>>>>
>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>> timestamp synchronization enabled seems to be important?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 2/8/22 11:01 PM, Guozhang Wang wrote:
>>>>>> Daan,
>>>>>>
>>>>>> Thanks for the replies, those make sense to me.
>>>>>>
>>>>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dg...@korfinancial.com>
>>>>> wrote:
>>>>>>
>>>>>>> I just updated the KIP to reflect the things discussed in this thread.
>>>>>>>
>>>>>>> As for your questions Guozhang:
>>>>>>>
>>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>>> read-only
>>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>>> store
>>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>>
>>>>>>> Good question. Both need to be co-partitioned to have the data
>>>>> available.
>>>>>>> Another option would be to use IQ to make the request, but that seems
>>>>> far
>>>>>>> from ideal.
>>>>>>>
>>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>>> app
>>>>>>>> B's processing timelines, or just like what we do for global stores
>>>>> that
>>>>>>> we
>>>>>>>> just update the read-only stores async?
>>>>>>>
>>>>>>> Pretty much the same as we do for global stores.
>>>>>>>
>>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>>> what's
>>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>>> store?
>>>>>>>
>>>>>>> I think because of the first answer the behavior differs from global
>>>>>>> stores.
>>>>>>>
>>>>>>> Makes sense?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> D.
>>>>>>>
>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>> Date: Thursday, 20 January 2022 at 21:12
>>>>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>> Any processor that would use that materialized, read-only statestore
>>>>>>> would need to wait for the store to be restored. I can't find a way to
>>>>> make
>>>>>>> that possible since processors can't wait for the statestore to be
>>>>> restored.
>>>>>>>
>>>>>>> This is built into the runtime already. Nothing to worry about. It's
>>>>>>> part of the regular restore logic -- as long as any store is restoring,
>>>>>>> all processing is blocked.
>>>>>>>
>>>>>>>> Also, since the statestore would have logging disabled, it means there
>>>>>>> is no initial restoration going on.
>>>>>>>
>>>>>>> No. When we hookup the input topic as changelog (as the DSL does) we
>>>>>>> restore from the input topic during regular restore phase. The restore
>>>>>>> logic does not even know it's reading from the input topic, but not from
>>>>>>> a "*-changelog" topic).
>>>>>>>
>>>>>>> Disabling changelogging does only affect the write path (ie,
>>>>>>> `store.put()`) but not the restore path due to the internal "hookup" of
>>>>>>> the input topic inside the restore logic.
>>>>>>>
>>>>>>> It's not easy to find/understand by reverse engineering I guess, but
>>>>>>> it's there.
>>>>>>>
>>>>>>> One pointer where the actual hookup happens (might help to dig into it
>>>>>>> more if you want):
>>>>>>>
>>>>>>>
>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 1/20/22 10:04 AM, Guozhang Wang wrote:
>>>>>>>> Hello Daan,
>>>>>>>>
>>>>>>>> Thanks for writing the KIP. I just read through it and just my 2c here:
>>>>>>> to
>>>>>>>> me it seems that one of the goal would be to "externalize" the internal
>>>>>>>> changelog topic of an application (say A) so that other consumers can
>>>>>>>> directly read them --- though technically without any auth, anyone
>>>>>>> knowing
>>>>>>>> the topic name would be able to write to it too, conceptually we would
>>>>>>> just
>>>>>>>> assume that app A is the only writer of that topic --- The question I
>>>>> had
>>>>>>>> is how much we want to externalize the topic. For example we can,
>>>>>>>> orthogonally to this KIP, just allow users to pass in a customized
>>>>> topic
>>>>>>>> name when constructing a state store, indicating the application A to
>>>>> use
>>>>>>>> that as the changelog, and since that topic is created outside of A and
>>>>>>> is
>>>>>>>> publicly visible to anyone else on that cluster, anyone --- including
>>>>> any
>>>>>>>> consumers, or streams apps. This is probably most flexible as for
>>>>>>> sharing,
>>>>>>>> but we are even less assured that if application A is the only writer
>>>>> to
>>>>>>>> that external topic unless we have explicit auth for A on that topic.
>>>>>>>>
>>>>>>>> Aside of that, here are a few more detailed comments about the
>>>>>>>> implementation design itself following your current proposal:
>>>>>>>>
>>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>>> read-only
>>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>>> store
>>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>>> app
>>>>>>>> B's processing timelines, or just like what we do for global stores
>>>>> that
>>>>>>> we
>>>>>>>> just update the read-only stores async?
>>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>>> what's
>>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>>> store?
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dg...@korfinancial.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Matthias,
>>>>>>>>>
>>>>>>>>> Thank you for that feedback, certainly some things to think about. Let
>>>>>>> me
>>>>>>>>> add my thoughts:
>>>>>>>>>
>>>>>>>>> +1 on simplifying the motivation. Was aiming to add more context but I
>>>>>>>>> think you're right, bringing it back to the essence makes more sense.
>>>>>>>>>
>>>>>>>>> I also follow the reasoning of not having leader and follower. Makes
>>>>>>> sense
>>>>>>>>> to view it from a single app point of view.
>>>>>>>>>
>>>>>>>>> As for the API method and its parameters, I wanted to stay close to
>>>>> the
>>>>>>>>> API for adding a regular statestore, but I can perfectly find myself
>>>>> in
>>>>>>>>> defining an addReadOnlyStateStore() method instead.
>>>>>>>>>
>>>>>>>>> I agree the processor approach would be the most flexible one, and
>>>>>>> surely
>>>>>>>>> it allows you to use a processor to base the statestore off an
>>>>> existing
>>>>>>>>> topic. From what I understood from the codebase, there might be a
>>>>>>> problem
>>>>>>>>> when using that statestore. Any processor that would use that
>>>>>>> materialized,
>>>>>>>>> read-only statestore would need to wait for the store to be restored.
>>>>> I
>>>>>>>>> can't find a way to make that possible since processors can't wait for
>>>>>>> the
>>>>>>>>> statestore to be restored. Also, since the statestore would have
>>>>> logging
>>>>>>>>> disabled, it means there is no initial restoration going on. As you
>>>>>>> wrote,
>>>>>>>>> the DSL is already doing this, so I'm pretty sure I'm missing
>>>>> something,
>>>>>>>>> just unable to find what exactly.
>>>>>>>>>
>>>>>>>>> I will rewrite the parts in the KIP to make processor-based the
>>>>>>> preferred
>>>>>>>>> choice, along with the changes to the motivation etc. Only thing to
>>>>>>> figure
>>>>>>>>> out is that restoring behavior to be sure processors of the readonly
>>>>>>>>> statestore aren't working with stale data.
>>>>>>>>>
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>> -----Original Message-----
>>>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>>>> Sent: 19 January 2022 21:31
>>>>>>>>> To: dev@kafka.apache.org
>>>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>>>
>>>>>>>>> Daan,
>>>>>>>>>
>>>>>>>>> thanks for the KIP. I personally find the motivation section a little
>>>>>>> bit
>>>>>>>>> confusing. If I understand the KIP correctly, you want to read a topic
>>>>>>> into
>>>>>>>>> a state store (ie, materialize it). This is already possible today.
>>>>>>>>>
>>>>>>>>> Of course, today a "second" changelog topic would be created. It seems
>>>>>>> the
>>>>>>>>> KIP aims to avoid the additional changelog topic, and to allow to
>>>>> re-use
>>>>>>>>> the original input topic (this optimization is already available for
>>>>> the
>>>>>>>>> DSL, but not for the PAPI).
>>>>>>>>>
>>>>>>>>> If my observation is correct, we can simplify the motivation
>>>>> accordingly
>>>>>>>>> (the fact that you want to use this feature to share state across
>>>>>>> different
>>>>>>>>> applications more efficiently seems to be secondary and we could omit
>>>>> it
>>>>>>>>> IMHO to keep the motivation focused).
>>>>>>>>>
>>>>>>>>> As a result, we also don't need to concept of "leader" and "follower".
>>>>>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns
>>>>>>> across
>>>>>>>>> different apps, but we can only guarantee stuff within a single
>>>>>>> application
>>>>>>>>> (ie, don't create a changelog but reuse an input topic as changelog).
>>>>> It
>>>>>>>>> would simplify the KIP if we remove these parts.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> For the API, I am wondering why you propose to pass in
>>>>> `processorNames`?
>>>>>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
>>>>>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor`
>>>>>>> must
>>>>>>>>> implement a certain pattern, ie, take each input record an apply it
>>>>>>>>> unmodified to the state store (ie, the Processor will be solely
>>>>>>> responsible
>>>>>>>>> to maintain the state store). We might also need to pass in other
>>>>>>> argument
>>>>>>>>> similar to `addGlobalStore` into this method). (More below.)
>>>>>>>>>
>>>>>>>>> If other processors need to read the state store, they can be
>>>>> connected
>>>>>>> to
>>>>>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
>>>>>>>>> approach to keep `processorName` would also be possible, but IMHO all
>>>>>>> those
>>>>>>>>> should only _read_ the state store (but not modify it), to keep a
>>>>> clear
>>>>>>>>> conceptual separation.
>>>>>>>>>
>>>>>>>>> About the method name: wondering if we should use a different name to
>>>>> be
>>>>>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Btw: please omit any code snippets and only put the newly added method
>>>>>>>>> signature in the KIP.
>>>>>>>>>
>>>>>>>>> What I don't yet understand is the section "Allow state stores to
>>>>>>>>> continue listening for changes from their changelog". Can you
>>>>> elaborate?
>>>>>>>>>
>>>>>>>>> About:
>>>>>>>>>
>>>>>>>>>> Since a changelog topic is created with the application id in it’s
>>>>>>> name,
>>>>>>>>> it would allow us to check in the follower if the changelog topic
>>>>> starts
>>>>>>>>> with our application id. If it doesn’t, we are not allowed to send a
>>>>>>> log.
>>>>>>>>>
>>>>>>>>> The DSL implements this differently, and just disabled the changelog
>>>>> for
>>>>>>>>> the state store (ie, for the "follower"). We could do the same thing
>>>>>>>>> (either enforcing that the provided `StoreBuilder` has changelogging
>>>>>>>>> disabled, or by just ignoring it and disabled it hard coded).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put
>>>>>>>>> into rejected alternatives. Note that the problem you call out, namely
>>>>>>>>>
>>>>>>>>>> Problem with this approach is the lack of having restoring support
>>>>>>>>> within the state store
>>>>>>>>>
>>>>>>>>> does not apply. A restore it absolutely possible and the DSL already
>>>>>>>>> supports it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Or is your concern with regard to performance? The "source-processor
>>>>>>>>> approach" would have the disadvantage that input data is first
>>>>>>>>> deserialized, fed into the Processor, and than serialized again when
>>>>> put
>>>>>>>>> into the state store. Re-using the state restore code is a good idea
>>>>>>>>> from a performance point of view, but it might require quite some
>>>>>>>>> internal changes (your proposal to "not stop restoring" might not work
>>>>>>>>> as it could trigger quite some undesired side-effects given the
>>>>> current
>>>>>>>>> architecture of Kafka Streams).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote:
>>>>>>>>>> Hey everyone,
>>>>>>>>>>
>>>>>>>>>> Just created a KIP on sharing statestore state across multiple
>>>>>>>>> applications without duplicating the data on multiple changelog
>>>>> topics.
>>>>>>>>> Have a look and tell me what you think or what to improve. This is my
>>>>>>> first
>>>>>>>>> one, so please be gentle ??
>>>>>>>>>>
>>>>>>>>>> https://cwiki.apache.org/confluence/x/q53kCw
>>>>>>>>>>
>>>>>>>>>> Cheers!
>>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>

Re: [DISCUSS] KIP-813 Shared State Stores

Posted by Daan Gertis <dg...@korfinancial.com>.
Hey John,


  *   1. Am I right I’m thinking that there’s no way to enforce the stores are actually read-only, right? It seems like the StoreBuilder interface is too generic for that. If that’s true, I think it’s fine, but we should be sure the JavaDoc clearly states that other processors must not write into these stores (except for the one that feeds it).

Yeah I couldn’t really find a way to limit it easily. We might be able to throw unsupported exceptions by wrapping the statestore, but that seems kind of brittle to do and feels a bit like a hack.

Also, the function name clearly states it should be considered readonly.


  *    2. Are you planning for these stores to get standbys as well? I would think so, otherwise the desired purpose of standbys (eliminating restoration latency during failover) would not be served.

Yeah I think standbys should be applicable here as well. But we get that by implementing these readonly statestores as regular ones right?

Cheers,
D.


From: John Roesler <vv...@apache.org>
Date: Friday, 1 April 2022 at 04:01
To: dev@kafka.apache.org <de...@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Hi Daan,

Thanks for the KIP!

I just got caught up on the discussion. I just have a some small questions, and then I will be ready to vote.

1. Am I right I’m thinking that there’s no way to enforce the stores are actually read-only, right? It seems like the StoreBuilder interface is too generic for that. If that’s true, I think it’s fine, but we should be sure the JavaDoc clearly states that other processors must not write into these stores (except for the one that feeds it).

 2. Are you planning for these stores to get standbys as well? I would think so, otherwise the desired purpose of standbys (eliminating restoration latency during failover) would not be served.

Thanks,
John

On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
> Thanks for updating the KIP. LGTM.
>
> I think we can start a vote.
>
>
>>  I think this might provide issues if your processor is doing a projection of the data.
>
> This is correct. It's a know issue:
> https://issues.apache.org/jira/browse/KAFKA-7663
>
> Global-stores/KTables are designed to put the data into the store
> _unmodified_.
>
>
> -Matthias
>
> On 2/28/22 5:05 AM, Daan Gertis wrote:
>> Updated the KIP to be more aligned with global state store function names.
>>
>> If I remember correctly during restore the processor will not be used right? I think this might provide issues if your processor is doing a projection of the data. Either way, I would not add that into this KIP since it is a specific use-case pattern.
>>
>> Unless there is anything more to add or change, I would propose moving to a vote?
>>
>> Cheers!
>> D.
>>
>> From: Matthias J. Sax <mj...@apache.org>
>> Date: Friday, 18 February 2022 at 03:29
>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>> Thanks for updating the KIP!
>>
>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>> an "optional" parameter?
>>
>> Also wondering if we need to pass in a `String sourceName` and `String
>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>> re-using the store name as currently proposed? -- In general I don't
>> have a strong opinion either way, but it seems to introduce some API
>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>>
>>
>>> Another thing we were confronted with was the restoring of state when the actual local storage is gone. For example, we host on K8s with ephemeral pods, so there is no persisted storage between pod restarts. However, the consumer group will be already been at the latest offset, preventing from previous data to be restored within the new pod’s statestore.
>>
>> We have already code in-place in the runtime to do the right thing for
>> this case (ie, via DSL source-table changelog optimization). We can
>> re-use this part. It's nothing we need to discuss on the KIP, but we can
>> discuss on the PR later.
>>
>>
>> -Matthias
>>
>>
>> On 2/17/22 10:09 AM, Guozhang Wang wrote:
>>> Hi Daan,
>>>
>>> I think for the read-only state stores you'd need ot slightly augment the
>>> checkpointing logic so that it would still write the checkpointed offsets
>>> while restoring from the changelogs.
>>>
>>>
>>> Guozhang
>>>
>>> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dg...@korfinancial.com>
>>> wrote:
>>>
>>>>> Could you add more details about the signature of
>>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>>> overloads taking different parameters? The KIP only contains some verbal
>>>>> description on the "Implementation Plan" section, that is hard to find
>>>>> and hard to read.
>>>>>
>>>>> The KIP mentions a `ProcessorProvider` -- do you mean
>>>> `ProcessorSupplier`?
>>>>>
>>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>>> synchronization (similar to global state stores)? It seems to be an
>>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>>> timestamp synchronization enabled seems to be important?
>>>>
>>>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>>>> have allow for timestamp synchronization.
>>>>
>>>> Another thing we were confronted with was the restoring of state when the
>>>> actual local storage is gone. For example, we host on K8s with ephemeral
>>>> pods, so there is no persisted storage between pod restarts. However, the
>>>> consumer group will be already been at the latest offset, preventing from
>>>> previous data to be restored within the new pod’s statestore.
>>>>
>>>> If I remember correctly, there was some checkpoint logic available when
>>>> restoring, but we are bypassing that since logging is disabled on the
>>>> statestore, no?
>>>>
>>>> As always, thanks for your insights.
>>>>
>>>> Cheers,
>>>> D.
>>>>
>>>>
>>>> From: Matthias J. Sax <mj...@apache.org>
>>>> Date: Wednesday, 16 February 2022 at 02:09
>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>> Thanks for updating the KIP.
>>>>
>>>> Could you add more details about the signature of
>>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>>> overloads taking different parameters? The KIP only contains some verbal
>>>> description on the "Implementation Plan" section, that is hard to find
>>>> and hard to read.
>>>>
>>>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>>>
>>>> About timestamp synchronization: why do you propose to disable timestamp
>>>> synchronization (similar to global state stores)? It seems to be an
>>>> unnecessary limitation? -- Given that we could re-use the new method for
>>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>>> timestamp synchronization enabled seems to be important?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 2/8/22 11:01 PM, Guozhang Wang wrote:
>>>>> Daan,
>>>>>
>>>>> Thanks for the replies, those make sense to me.
>>>>>
>>>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dg...@korfinancial.com>
>>>> wrote:
>>>>>
>>>>>> I just updated the KIP to reflect the things discussed in this thread.
>>>>>>
>>>>>> As for your questions Guozhang:
>>>>>>
>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>> read-only
>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>> store
>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>
>>>>>> Good question. Both need to be co-partitioned to have the data
>>>> available.
>>>>>> Another option would be to use IQ to make the request, but that seems
>>>> far
>>>>>> from ideal.
>>>>>>
>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>> app
>>>>>>> B's processing timelines, or just like what we do for global stores
>>>> that
>>>>>> we
>>>>>>> just update the read-only stores async?
>>>>>>
>>>>>> Pretty much the same as we do for global stores.
>>>>>>
>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>> what's
>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>> store?
>>>>>>
>>>>>> I think because of the first answer the behavior differs from global
>>>>>> stores.
>>>>>>
>>>>>> Makes sense?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> D.
>>>>>>
>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>> Date: Thursday, 20 January 2022 at 21:12
>>>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>> Any processor that would use that materialized, read-only statestore
>>>>>> would need to wait for the store to be restored. I can't find a way to
>>>> make
>>>>>> that possible since processors can't wait for the statestore to be
>>>> restored.
>>>>>>
>>>>>> This is built into the runtime already. Nothing to worry about. It's
>>>>>> part of the regular restore logic -- as long as any store is restoring,
>>>>>> all processing is blocked.
>>>>>>
>>>>>>> Also, since the statestore would have logging disabled, it means there
>>>>>> is no initial restoration going on.
>>>>>>
>>>>>> No. When we hookup the input topic as changelog (as the DSL does) we
>>>>>> restore from the input topic during regular restore phase. The restore
>>>>>> logic does not even know it's reading from the input topic, but not from
>>>>>> a "*-changelog" topic).
>>>>>>
>>>>>> Disabling changelogging does only affect the write path (ie,
>>>>>> `store.put()`) but not the restore path due to the internal "hookup" of
>>>>>> the input topic inside the restore logic.
>>>>>>
>>>>>> It's not easy to find/understand by reverse engineering I guess, but
>>>>>> it's there.
>>>>>>
>>>>>> One pointer where the actual hookup happens (might help to dig into it
>>>>>> more if you want):
>>>>>>
>>>>>>
>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 1/20/22 10:04 AM, Guozhang Wang wrote:
>>>>>>> Hello Daan,
>>>>>>>
>>>>>>> Thanks for writing the KIP. I just read through it and just my 2c here:
>>>>>> to
>>>>>>> me it seems that one of the goal would be to "externalize" the internal
>>>>>>> changelog topic of an application (say A) so that other consumers can
>>>>>>> directly read them --- though technically without any auth, anyone
>>>>>> knowing
>>>>>>> the topic name would be able to write to it too, conceptually we would
>>>>>> just
>>>>>>> assume that app A is the only writer of that topic --- The question I
>>>> had
>>>>>>> is how much we want to externalize the topic. For example we can,
>>>>>>> orthogonally to this KIP, just allow users to pass in a customized
>>>> topic
>>>>>>> name when constructing a state store, indicating the application A to
>>>> use
>>>>>>> that as the changelog, and since that topic is created outside of A and
>>>>>> is
>>>>>>> publicly visible to anyone else on that cluster, anyone --- including
>>>> any
>>>>>>> consumers, or streams apps. This is probably most flexible as for
>>>>>> sharing,
>>>>>>> but we are even less assured that if application A is the only writer
>>>> to
>>>>>>> that external topic unless we have explicit auth for A on that topic.
>>>>>>>
>>>>>>> Aside of that, here are a few more detailed comments about the
>>>>>>> implementation design itself following your current proposal:
>>>>>>>
>>>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>>>> different from the num.tasks of app B's sub-topology with that
>>>> read-only
>>>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>>>> store
>>>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>>>> app
>>>>>>> B's processing timelines, or just like what we do for global stores
>>>> that
>>>>>> we
>>>>>>> just update the read-only stores async?
>>>>>>> 3) If the answer to both of the above questions are the latter, then
>>>>>> what's
>>>>>>> the main difference of adding a read-only store v.s. adding a global
>>>>>> store?
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dg...@korfinancial.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Matthias,
>>>>>>>>
>>>>>>>> Thank you for that feedback, certainly some things to think about. Let
>>>>>> me
>>>>>>>> add my thoughts:
>>>>>>>>
>>>>>>>> +1 on simplifying the motivation. Was aiming to add more context but I
>>>>>>>> think you're right, bringing it back to the essence makes more sense.
>>>>>>>>
>>>>>>>> I also follow the reasoning of not having leader and follower. Makes
>>>>>> sense
>>>>>>>> to view it from a single app point of view.
>>>>>>>>
>>>>>>>> As for the API method and its parameters, I wanted to stay close to
>>>> the
>>>>>>>> API for adding a regular statestore, but I can perfectly find myself
>>>> in
>>>>>>>> defining an addReadOnlyStateStore() method instead.
>>>>>>>>
>>>>>>>> I agree the processor approach would be the most flexible one, and
>>>>>> surely
>>>>>>>> it allows you to use a processor to base the statestore off an
>>>> existing
>>>>>>>> topic. From what I understood from the codebase, there might be a
>>>>>> problem
>>>>>>>> when using that statestore. Any processor that would use that
>>>>>> materialized,
>>>>>>>> read-only statestore would need to wait for the store to be restored.
>>>> I
>>>>>>>> can't find a way to make that possible since processors can't wait for
>>>>>> the
>>>>>>>> statestore to be restored. Also, since the statestore would have
>>>> logging
>>>>>>>> disabled, it means there is no initial restoration going on. As you
>>>>>> wrote,
>>>>>>>> the DSL is already doing this, so I'm pretty sure I'm missing
>>>> something,
>>>>>>>> just unable to find what exactly.
>>>>>>>>
>>>>>>>> I will rewrite the parts in the KIP to make processor-based the
>>>>>> preferred
>>>>>>>> choice, along with the changes to the motivation etc. Only thing to
>>>>>> figure
>>>>>>>> out is that restoring behavior to be sure processors of the readonly
>>>>>>>> statestore aren't working with stale data.
>>>>>>>>
>>>>>>>> D.
>>>>>>>>
>>>>>>>> -----Original Message-----
>>>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>>>> Sent: 19 January 2022 21:31
>>>>>>>> To: dev@kafka.apache.org
>>>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>>>
>>>>>>>> Daan,
>>>>>>>>
>>>>>>>> thanks for the KIP. I personally find the motivation section a little
>>>>>> bit
>>>>>>>> confusing. If I understand the KIP correctly, you want to read a topic
>>>>>> into
>>>>>>>> a state store (ie, materialize it). This is already possible today.
>>>>>>>>
>>>>>>>> Of course, today a "second" changelog topic would be created. It seems
>>>>>> the
>>>>>>>> KIP aims to avoid the additional changelog topic, and to allow to
>>>> re-use
>>>>>>>> the original input topic (this optimization is already available for
>>>> the
>>>>>>>> DSL, but not for the PAPI).
>>>>>>>>
>>>>>>>> If my observation is correct, we can simplify the motivation
>>>> accordingly
>>>>>>>> (the fact that you want to use this feature to share state across
>>>>>> different
>>>>>>>> applications more efficiently seems to be secondary and we could omit
>>>> it
>>>>>>>> IMHO to keep the motivation focused).
>>>>>>>>
>>>>>>>> As a result, we also don't need to concept of "leader" and "follower".
>>>>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns
>>>>>> across
>>>>>>>> different apps, but we can only guarantee stuff within a single
>>>>>> application
>>>>>>>> (ie, don't create a changelog but reuse an input topic as changelog).
>>>> It
>>>>>>>> would simplify the KIP if we remove these parts.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> For the API, I am wondering why you propose to pass in
>>>> `processorNames`?
>>>>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
>>>>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor`
>>>>>> must
>>>>>>>> implement a certain pattern, ie, take each input record an apply it
>>>>>>>> unmodified to the state store (ie, the Processor will be solely
>>>>>> responsible
>>>>>>>> to maintain the state store). We might also need to pass in other
>>>>>> argument
>>>>>>>> similar to `addGlobalStore` into this method). (More below.)
>>>>>>>>
>>>>>>>> If other processors need to read the state store, they can be
>>>> connected
>>>>>> to
>>>>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
>>>>>>>> approach to keep `processorName` would also be possible, but IMHO all
>>>>>> those
>>>>>>>> should only _read_ the state store (but not modify it), to keep a
>>>> clear
>>>>>>>> conceptual separation.
>>>>>>>>
>>>>>>>> About the method name: wondering if we should use a different name to
>>>> be
>>>>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Btw: please omit any code snippets and only put the newly added method
>>>>>>>> signature in the KIP.
>>>>>>>>
>>>>>>>> What I don't yet understand is the section "Allow state stores to
>>>>>>>> continue listening for changes from their changelog". Can you
>>>> elaborate?
>>>>>>>>
>>>>>>>> About:
>>>>>>>>
>>>>>>>>> Since a changelog topic is created with the application id in it’s
>>>>>> name,
>>>>>>>> it would allow us to check in the follower if the changelog topic
>>>> starts
>>>>>>>> with our application id. If it doesn’t, we are not allowed to send a
>>>>>> log.
>>>>>>>>
>>>>>>>> The DSL implements this differently, and just disabled the changelog
>>>> for
>>>>>>>> the state store (ie, for the "follower"). We could do the same thing
>>>>>>>> (either enforcing that the provided `StoreBuilder` has changelogging
>>>>>>>> disabled, or by just ignoring it and disabled it hard coded).
>>>>>>>>
>>>>>>>>
>>>>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put
>>>>>>>> into rejected alternatives. Note that the problem you call out, namely
>>>>>>>>
>>>>>>>>> Problem with this approach is the lack of having restoring support
>>>>>>>> within the state store
>>>>>>>>
>>>>>>>> does not apply. A restore it absolutely possible and the DSL already
>>>>>>>> supports it.
>>>>>>>>
>>>>>>>>
>>>>>>>> Or is your concern with regard to performance? The "source-processor
>>>>>>>> approach" would have the disadvantage that input data is first
>>>>>>>> deserialized, fed into the Processor, and than serialized again when
>>>> put
>>>>>>>> into the state store. Re-using the state restore code is a good idea
>>>>>>>> from a performance point of view, but it might require quite some
>>>>>>>> internal changes (your proposal to "not stop restoring" might not work
>>>>>>>> as it could trigger quite some undesired side-effects given the
>>>> current
>>>>>>>> architecture of Kafka Streams).
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote:
>>>>>>>>> Hey everyone,
>>>>>>>>>
>>>>>>>>> Just created a KIP on sharing statestore state across multiple
>>>>>>>> applications without duplicating the data on multiple changelog
>>>> topics.
>>>>>>>> Have a look and tell me what you think or what to improve. This is my
>>>>>> first
>>>>>>>> one, so please be gentle ??
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/x/q53kCw
>>>>>>>>>
>>>>>>>>> Cheers!
>>>>>>>>> D.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>