You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2016/09/04 15:28:08 UTC

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

We had a recent discussion about KIP-63, and I just c&p from the JIRA
discussion:

Damian:
> During the code walk-through, Matthias raised a very good point about the use of context().forward being coupled to whether or not caching is enabled. Now that i've had the chance to think about it I have one potential solution for making this transparent to uses of the Processor API.
> 
> We can add another method boolean isCachingEnabled() to the new interface ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode:
> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled()
> 
> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener to attach to the ForwardingStateStoreSupplier we can call ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled())
> 
> We add an extra boolean parameter to the ProcessorRecordContextImpl forward this will be set to false when constructed from StreamTask and will be set to true when constructed from ProcessorNodeCacheFlushListener. Then in ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) where shouldForward is return forward || !node.stateStoreCachingEnabled();
> 
> Now Processors are free to call context().forward(..) whether caching is enabled or not. If it is enabled the values just wont get forwarded until the cache evicts/flushes them.


Matthias:
> I guess this is a good solution/workaround. I had something like this in my mind during the call, too.
> 
> However, thinking about the root cause of this issue again, I am not sure if the (overall) design of this KIP is optimal or not. My new concern is, that with this caching strategy, we "merge" two concepts into one; and I am not sure, if we should to this.
> 
> Currently, message flow and state is decoupled and independent of each other. Thus, if there is a state, updates to the state are completely independent from emitting output records. With the new design, we merge state updates and record emits, limiting the overall flexibility. I guess, from a DSL point of view, this would not be problematic, because in an aggregation and changelog output, each update to the state should result in a downstream record. However, from a Processor API point of view, there are other patterns we want to be able to support, too.
> 
> Basically, for each input record, there a four different patterns that could be applied by the user:
> 
>     no state updates, no output records
>     only state update
>     only output records
>     state updates and output records
> 
> Right now, we go with a design that allows to use one of the patterns within a Processor. However, all 4 pattern could be mixed within a single Processor (pre KIP design), and this mixture would not be possible any more. If we want to support all four cases, we might not want to merge both into "a single abstraction" as we do in the design of this PR. What if a user just wants to sent a record downstream (without any state manipulation)?
> 
> Going back to the KIP design, we move the cache from RocksDB into the processor. However, what we actually wanted to do was to de-duplicate output records. Thus, the newly introduced cache, could actually go "after the processor" and could be completely independent from the state. Thus, on each call to forward() the record is put into the cache, and if the cache is full, an actual cache eviction and record forwarding happens. This would make the de-duplication cache independent from the state.


Eno:
> it's not entirely true that the flexibility is limited. For example, what's next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 where we add the dedup cache to the to operator. That is not implemented yet.


Damian:
> i think of the 4 patterns you mentioned only the last one changes, i.e, state updates and output records.
> context.forward() still exists so you can just send a record downstream without any state manipulation, that behaviour hasn't changed.






On 08/24/2016 03:35 PM, Eno Thereska wrote:
> Hi folks,
> 
> We've been working on a proof-of-concept for KIP-63 and that can now be
> found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776)
> under PR https://github.com/apache/kafka/pull/1752. It is still work in
> progress, however we are confident that the basic structure is there.
> 
> As part of this work, we've also updated the KIP to clarify several things,
> listed here for convenience:
> 
> - Clarify that the optimization is applicable to aggregations and to
> operators. It is not applicable to joins.
> - Clarify that for the low-level Processor API, we propose to allow users
> for disabling caching on a store-by-store basis using a new
> .enableCaching() call.
> 
> We'll start the voting process shortly for this KIP.
> 
> Thanks
> Eno
> 
> 
> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <en...@gmail.com>
> wrote:
> 
>> Hi there,
>>
>> I have created KIP-63: Unify store and downstream caching in streams
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 63%3A+Unify+store+and+downstream+caching+in+streams
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>
>>
>>
>> Feedback is appreciated.
>>
>> Thank you
>> Eno
>>
> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Eno Thereska <en...@gmail.com>.
Hi,

Users have many options for buffering in the Processor API and it doesn't seem right we should prescribe a particular one. Also, there is value in continuing to keep the Processor API simple.

As such, we'll remove the ".enableCaching" for a store used in the processor API from the KIP and simplify the KIP by having it apply to the DSL only.

Thanks
Eno

> On 7 Sep 2016, at 15:41, Damian Guy <da...@gmail.com> wrote:
> 
> Gouzhang,
> 
> Some points about what you have mentioned:
> 1. You can't just call context.forward() on the flush listener. You have to
> set some other contextual information (currently ProcessorRecordContext)
> prior to doing this otherwise the nodes you are forwarding to are
> undetermined, i.e, this can be called at any point during the topology or
> on commit.
> 2. It is a bytes cache, so the Processors would need to have the Serdes in
> order to use this pattern.
> 3. the namespace of the cache can't just be processorName or even
> processorName-stateStoreName, it also will need to have something like
> taskId along with it.
> 
> Thanks,
> Damian
> 
> 
> On Wed, 7 Sep 2016 at 00:39 Guozhang Wang <wa...@gmail.com> wrote:
> 
>> Hi Matthias,
>> 
>> I agree with your concerns of coupling with record forwarding with record
>> storing in the state store, and my understanding is that this can (and
>> should) be resolved with the current interface. Here are my thoughts:
>> 
>> 1. The global cache, MemoryLRUCacheBytes, although is currently defined as
>> internal class, since it is exposed in ProcessorContext anyways, should
>> really be a public class anyways that users can access to (I have some
>> other comments about the names, but will rather leave them in the PR).
>> 
>> 2. In the processor API, the users can choose to use the cache to store the
>> intermediate results in the cache, and register the flush listener via
>> addDirtyEntryFlushListener (again some naming suggestions in PR but use it
>> for discussion for now). And as a result, if the old processor code looks
>> like this:
>> 
>> ----------------
>> 
>> process(...) {
>> 
>>  state.put(...);
>>  context.forward(...);
>> }
>> ----------------
>> 
>> Users can now leverage the cache on some of the processors by modifying the
>> code as:
>> 
>> ----------------
>> 
>> init(...) {
>> 
>>  context.getCache().addDirtyEntyFlushLisener(processorName,
>> {state.put(...); context.forward(...)})
>> }
>> 
>> process(...) {
>> 
>>  context.getCache().put(processorName, ..);
>> }
>> 
>> ----------------
>> 
>> 3. Note whether or not to apply caching is optional for each processor node
>> now, and is decoupled with its logic of forwarding / storing in persistent
>> state stores.
>> 
>> One may argue that now if users want to make use of the cache, he will need
>> to make code changes; but I think this is a reasonable requirement to users
>> actually, since that 1) currently we do one update-per-incoming-record, and
>> without code changes this behavior will be preserved, and 2) for DSL
>> implementation, we can just follow the above pattern to abstract it from
>> users, so they can pick up these changes automatically.
>> 
>> 
>> Guozhang
>> 
>> 
>> On Tue, Sep 6, 2016 at 7:41 AM, Eno Thereska <en...@gmail.com>
>> wrote:
>> 
>>> A small update to the KIP: the deduping of records using the cache does
>>> not affect the .to operator since we'd have already deduped the KTable
>>> before the operator. Adjusting KIP.
>>> 
>>> Thanks
>>> Eno
>>> 
>>>> On 5 Sep 2016, at 12:43, Eno Thereska <en...@gmail.com> wrote:
>>>> 
>>>> Hi Matthias,
>>>> 
>>>> The motivation for KIP-63 was primarily aggregates and reducing the
>> load
>>> on "both" state stores and downstream. I think there is agreement that
>> for
>>> the DSL the motivation and design make sense.
>>>> 
>>>> For the Processor API: caching is a major component in any system, and
>>> it is difficult to continue to operate as before, without fully
>>> understanding the consequences. Hence, I think this is mostly a case of
>>> educating users to understand the boundaries of the solution.
>>>> 
>>>> Introducing a cache, either for the state store only, or for downstream
>>> forwarding only, or for both, leads to moving from a model where we
>> process
>>> each request end-to-end (today) to one where a request is temporarily
>>> buffered in a cache. In all the cases, this opens up the question of what
>>> to do next once the request then leaves the cache, and how to express
>> that
>>> (future) behaviour. E.g., even when the cache is just for downstream
>>> forwarding (i.e., decoupled from any state store), the processor API user
>>> might be surprised that context.forward() does not immediately do
>> anything.
>>>> 
>>>> I agree that for ultra-flexibility, a processor API user should be able
>>> to choose whether the dedup cache is put 1) on top of a store only, 2) on
>>> forward only, 3) on both store and forward, but given the motivation for
>>> KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a
>>> reasonable choice that provides good default behaviour, without prodding
>>> the user to specify the combinations.
>>>> 
>>>> We need to educate users that if a cache is used in the Processor API,
>>> the forwarding will happen in the future.
>>>> 
>>>> -Eno
>>>> 
>>>> 
>>>> 
>>>>> On 4 Sep 2016, at 19:11, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>>>> 
>>>>>> Processor code should always work; independently if caching is
>> enabled
>>>>> or not.
>>>>> 
>>>>> If we want to get this, I guess we need a quite different design (see
>>> (1)).
>>>>> 
>>>>> The point is, that we want to dedup the output, and not state updates.
>>>>> 
>>>>> It just happens that our starting point was KTable, for which state
>>>>> updates and downstream changelog output is the same thing. Thus, we
>> can
>>>>> just use the internal KTable state to do the deduplication for the
>>>>> downstream changelog.
>>>>> 
>>>>> However, from a general point of view (Processor API view), if we
>> dedup
>>>>> the output, we want dedup/caching for the processor (and not for a
>> state
>>>>> store). Of course, we need a state to do the dedup. For KTable, both
>>>>> things merge into a single abstraction, and we use only a single state
>>>>> instead of two. From a general point of view, we would need two states
>>>>> though (one for the actual state, and one for dedup -- think Processor
>>>>> API -- not DSL).
>>>>> 
>>>>> 
>>>>> Alternative proposal 1:
>>>>> (see also (2) -- which might be better than this one)
>>>>> 
>>>>> Thus, it might be a cleaner design to decouple user-states and
>>>>> dedup-state from each other. If a user enables dedup/caching (for a
>>>>> processor) we add an additional state to do the dedup and this
>>>>> dedup-state is independent from all user states and context.forward()
>>>>> works as always. The dedup state could be hidden from the user and
>> could
>>>>> be a pure in-memory state (no need for any recovery -- only flush on
>>>>> commit). Internally, a context.forward() would call dedupState.put()
>> and
>>>>> trigger actual output if dedup state needs to evict records.
>>>>> 
>>>>> The disadvantage would be, that we end up with two states for KTable.
>>>>> The advantage is, that deduplication can be switched off/on without
>> any
>>>>> Processor code change.
>>>>> 
>>>>> 
>>>>> Alternative proposal 2:
>>>>> 
>>>>> We basically keep the current KIP design, including not to disable
>>>>> context.forward() if a cached state is used. Additionally, for cached
>>>>> state, we rename put() into putAndForward() which is only available
>> for
>>>>> cached states. Thus, in processor code, a state must be explicitly
>> cast
>>>>> into a cached state. We also make the user aware, that an update/put
>> to
>>>>> a state result in downstream output and that context.forward() would
>> be
>>>>> a "direct/non-cached" output.
>>>>> 
>>>>> The disadvantage of this is, that processor code is not independent
>> from
>>>>> caching and thus, caching cannot just be switched on/off (ie, we do
>> not
>>>>> follow the initial statement of this mail). The advantage is, we can
>>>>> keep a single state for KTable and this design is just small changes
>> to
>>>>> the current KIP.
>>>>> 
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>>>>>> Sure, you can use a non-cached state. However, if you write code like
>>>>>> below for a non-cached state, and learn about caching later on, and
>>>>>> think, caching is a cool feature, I want to use it, you would simply
>>>>>> want to enable caching (without breaking your code).
>>>>>> 
>>>>>> Processor code should always work independently if caching is enabled
>>> or
>>>>>> not.
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>>>>>> Hi Matthias,
>>>>>>> 
>>>>>>> Thanks for the good questions.
>>>>>>> 
>>>>>>> There is still the option of not using cached state. If one uses
>>> cached state it will dedup for stores and forwarding further. But you can
>>> always disable caching and do what you say.
>>>>>>> 
>>>>>>> Eno
>>>>>>> 
>>>>>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>>>>>> 
>>>>>>>> Sorry for not being precise. What I meant be "completely" is for a
>>>>>>>> single processor. Assume I want to have the following pattern:
>>>>>>>> 
>>>>>>>> process(...) {
>>>>>>>> if (someCondition) {
>>>>>>>>   state.put(...)
>>>>>>>>   context.forward(...);
>>>>>>>> } else {
>>>>>>>>   context.forward(...);
>>>>>>>> }
>>>>>>>> 
>>>>>>>> Ie, for some record I do update the state and emit output records,
>>> for
>>>>>>>> other records I only emit output records. This work in current
>>> design.
>>>>>>>> However, if a "cached state" would be used, it would not work any
>>> more.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>> 
>>>>>>>>> Thanks for bringing the conversation across to the thread.
>>>>>>>>> 
>>>>>>>>> I think a main limitation would be, that you cannot mix the 4
>>> patterns
>>>>>>>>>> within a single application anymore (iff you use a "caches
>>> state"). If
>>>>>>>>>> you have processor with a "cached state" this disables direct
>>> usage of
>>>>>>>>>> context.forward() completely -- if I understand the design
>>> correctly.
>>>>>>>>>> Thus, if a "cached state" is used, forwarding is only possible
>> via
>>> state
>>>>>>>>>> updates.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> The above statement is not correct. Caching doesn't completely
>>> disable
>>>>>>>>> forwarding, it only disables it for Processors that are using
>> State
>>> Stores.
>>>>>>>>> In all other cases context.forward() works as it does now.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Damian
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> --
>> -- Guozhang
>> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Damian Guy <da...@gmail.com>.
Gouzhang,

Some points about what you have mentioned:
1. You can't just call context.forward() on the flush listener. You have to
set some other contextual information (currently ProcessorRecordContext)
prior to doing this otherwise the nodes you are forwarding to are
undetermined, i.e, this can be called at any point during the topology or
on commit.
2. It is a bytes cache, so the Processors would need to have the Serdes in
order to use this pattern.
3. the namespace of the cache can't just be processorName or even
processorName-stateStoreName, it also will need to have something like
taskId along with it.

Thanks,
Damian


On Wed, 7 Sep 2016 at 00:39 Guozhang Wang <wa...@gmail.com> wrote:

> Hi Matthias,
>
> I agree with your concerns of coupling with record forwarding with record
> storing in the state store, and my understanding is that this can (and
> should) be resolved with the current interface. Here are my thoughts:
>
> 1. The global cache, MemoryLRUCacheBytes, although is currently defined as
> internal class, since it is exposed in ProcessorContext anyways, should
> really be a public class anyways that users can access to (I have some
> other comments about the names, but will rather leave them in the PR).
>
> 2. In the processor API, the users can choose to use the cache to store the
> intermediate results in the cache, and register the flush listener via
> addDirtyEntryFlushListener (again some naming suggestions in PR but use it
> for discussion for now). And as a result, if the old processor code looks
> like this:
>
> ----------------
>
> process(...) {
>
>   state.put(...);
>   context.forward(...);
> }
> ----------------
>
> Users can now leverage the cache on some of the processors by modifying the
> code as:
>
> ----------------
>
> init(...) {
>
>   context.getCache().addDirtyEntyFlushLisener(processorName,
> {state.put(...); context.forward(...)})
> }
>
> process(...) {
>
>   context.getCache().put(processorName, ..);
> }
>
> ----------------
>
> 3. Note whether or not to apply caching is optional for each processor node
> now, and is decoupled with its logic of forwarding / storing in persistent
> state stores.
>
> One may argue that now if users want to make use of the cache, he will need
> to make code changes; but I think this is a reasonable requirement to users
> actually, since that 1) currently we do one update-per-incoming-record, and
> without code changes this behavior will be preserved, and 2) for DSL
> implementation, we can just follow the above pattern to abstract it from
> users, so they can pick up these changes automatically.
>
>
> Guozhang
>
>
> On Tue, Sep 6, 2016 at 7:41 AM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > A small update to the KIP: the deduping of records using the cache does
> > not affect the .to operator since we'd have already deduped the KTable
> > before the operator. Adjusting KIP.
> >
> > Thanks
> > Eno
> >
> > > On 5 Sep 2016, at 12:43, Eno Thereska <en...@gmail.com> wrote:
> > >
> > > Hi Matthias,
> > >
> > > The motivation for KIP-63 was primarily aggregates and reducing the
> load
> > on "both" state stores and downstream. I think there is agreement that
> for
> > the DSL the motivation and design make sense.
> > >
> > > For the Processor API: caching is a major component in any system, and
> > it is difficult to continue to operate as before, without fully
> > understanding the consequences. Hence, I think this is mostly a case of
> > educating users to understand the boundaries of the solution.
> > >
> > > Introducing a cache, either for the state store only, or for downstream
> > forwarding only, or for both, leads to moving from a model where we
> process
> > each request end-to-end (today) to one where a request is temporarily
> > buffered in a cache. In all the cases, this opens up the question of what
> > to do next once the request then leaves the cache, and how to express
> that
> > (future) behaviour. E.g., even when the cache is just for downstream
> > forwarding (i.e., decoupled from any state store), the processor API user
> > might be surprised that context.forward() does not immediately do
> anything.
> > >
> > > I agree that for ultra-flexibility, a processor API user should be able
> > to choose whether the dedup cache is put 1) on top of a store only, 2) on
> > forward only, 3) on both store and forward, but given the motivation for
> > KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a
> > reasonable choice that provides good default behaviour, without prodding
> > the user to specify the combinations.
> > >
> > > We need to educate users that if a cache is used in the Processor API,
> > the forwarding will happen in the future.
> > >
> > > -Eno
> > >
> > >
> > >
> > >> On 4 Sep 2016, at 19:11, Matthias J. Sax <ma...@confluent.io>
> wrote:
> > >>
> > >>> Processor code should always work; independently if caching is
> enabled
> > >> or not.
> > >>
> > >> If we want to get this, I guess we need a quite different design (see
> > (1)).
> > >>
> > >> The point is, that we want to dedup the output, and not state updates.
> > >>
> > >> It just happens that our starting point was KTable, for which state
> > >> updates and downstream changelog output is the same thing. Thus, we
> can
> > >> just use the internal KTable state to do the deduplication for the
> > >> downstream changelog.
> > >>
> > >> However, from a general point of view (Processor API view), if we
> dedup
> > >> the output, we want dedup/caching for the processor (and not for a
> state
> > >> store). Of course, we need a state to do the dedup. For KTable, both
> > >> things merge into a single abstraction, and we use only a single state
> > >> instead of two. From a general point of view, we would need two states
> > >> though (one for the actual state, and one for dedup -- think Processor
> > >> API -- not DSL).
> > >>
> > >>
> > >> Alternative proposal 1:
> > >> (see also (2) -- which might be better than this one)
> > >>
> > >> Thus, it might be a cleaner design to decouple user-states and
> > >> dedup-state from each other. If a user enables dedup/caching (for a
> > >> processor) we add an additional state to do the dedup and this
> > >> dedup-state is independent from all user states and context.forward()
> > >> works as always. The dedup state could be hidden from the user and
> could
> > >> be a pure in-memory state (no need for any recovery -- only flush on
> > >> commit). Internally, a context.forward() would call dedupState.put()
> and
> > >> trigger actual output if dedup state needs to evict records.
> > >>
> > >> The disadvantage would be, that we end up with two states for KTable.
> > >> The advantage is, that deduplication can be switched off/on without
> any
> > >> Processor code change.
> > >>
> > >>
> > >> Alternative proposal 2:
> > >>
> > >> We basically keep the current KIP design, including not to disable
> > >> context.forward() if a cached state is used. Additionally, for cached
> > >> state, we rename put() into putAndForward() which is only available
> for
> > >> cached states. Thus, in processor code, a state must be explicitly
> cast
> > >> into a cached state. We also make the user aware, that an update/put
> to
> > >> a state result in downstream output and that context.forward() would
> be
> > >> a "direct/non-cached" output.
> > >>
> > >> The disadvantage of this is, that processor code is not independent
> from
> > >> caching and thus, caching cannot just be switched on/off (ie, we do
> not
> > >> follow the initial statement of this mail). The advantage is, we can
> > >> keep a single state for KTable and this design is just small changes
> to
> > >> the current KIP.
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
> > >>> Sure, you can use a non-cached state. However, if you write code like
> > >>> below for a non-cached state, and learn about caching later on, and
> > >>> think, caching is a cool feature, I want to use it, you would simply
> > >>> want to enable caching (without breaking your code).
> > >>>
> > >>> Processor code should always work independently if caching is enabled
> > or
> > >>> not.
> > >>>
> > >>> -Matthias
> > >>>
> > >>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
> > >>>> Hi Matthias,
> > >>>>
> > >>>> Thanks for the good questions.
> > >>>>
> > >>>> There is still the option of not using cached state. If one uses
> > cached state it will dedup for stores and forwarding further. But you can
> > always disable caching and do what you say.
> > >>>>
> > >>>> Eno
> > >>>>
> > >>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> > >>>>>
> > >>>>> Sorry for not being precise. What I meant be "completely" is for a
> > >>>>> single processor. Assume I want to have the following pattern:
> > >>>>>
> > >>>>> process(...) {
> > >>>>>  if (someCondition) {
> > >>>>>    state.put(...)
> > >>>>>    context.forward(...);
> > >>>>>  } else {
> > >>>>>    context.forward(...);
> > >>>>> }
> > >>>>>
> > >>>>> Ie, for some record I do update the state and emit output records,
> > for
> > >>>>> other records I only emit output records. This work in current
> > design.
> > >>>>> However, if a "cached state" would be used, it would not work any
> > more.
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
> > >>>>>> Hi Matthias,
> > >>>>>>
> > >>>>>> Thanks for bringing the conversation across to the thread.
> > >>>>>>
> > >>>>>> I think a main limitation would be, that you cannot mix the 4
> > patterns
> > >>>>>>> within a single application anymore (iff you use a "caches
> > state"). If
> > >>>>>>> you have processor with a "cached state" this disables direct
> > usage of
> > >>>>>>> context.forward() completely -- if I understand the design
> > correctly.
> > >>>>>>> Thus, if a "cached state" is used, forwarding is only possible
> via
> > state
> > >>>>>>> updates.
> > >>>>>>>
> > >>>>>>>
> > >>>>>> The above statement is not correct. Caching doesn't completely
> > disable
> > >>>>>> forwarding, it only disables it for Processors that are using
> State
> > Stores.
> > >>>>>> In all other cases context.forward() works as it does now.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Damian
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Matthias,

I agree with your concerns of coupling with record forwarding with record
storing in the state store, and my understanding is that this can (and
should) be resolved with the current interface. Here are my thoughts:

1. The global cache, MemoryLRUCacheBytes, although is currently defined as
internal class, since it is exposed in ProcessorContext anyways, should
really be a public class anyways that users can access to (I have some
other comments about the names, but will rather leave them in the PR).

2. In the processor API, the users can choose to use the cache to store the
intermediate results in the cache, and register the flush listener via
addDirtyEntryFlushListener (again some naming suggestions in PR but use it
for discussion for now). And as a result, if the old processor code looks
like this:

----------------

process(...) {

  state.put(...);
  context.forward(...);
}
----------------

Users can now leverage the cache on some of the processors by modifying the
code as:

----------------

init(...) {

  context.getCache().addDirtyEntyFlushLisener(processorName,
{state.put(...); context.forward(...)})
}

process(...) {

  context.getCache().put(processorName, ..);
}

----------------

3. Note whether or not to apply caching is optional for each processor node
now, and is decoupled with its logic of forwarding / storing in persistent
state stores.

One may argue that now if users want to make use of the cache, he will need
to make code changes; but I think this is a reasonable requirement to users
actually, since that 1) currently we do one update-per-incoming-record, and
without code changes this behavior will be preserved, and 2) for DSL
implementation, we can just follow the above pattern to abstract it from
users, so they can pick up these changes automatically.


Guozhang


On Tue, Sep 6, 2016 at 7:41 AM, Eno Thereska <en...@gmail.com> wrote:

> A small update to the KIP: the deduping of records using the cache does
> not affect the .to operator since we'd have already deduped the KTable
> before the operator. Adjusting KIP.
>
> Thanks
> Eno
>
> > On 5 Sep 2016, at 12:43, Eno Thereska <en...@gmail.com> wrote:
> >
> > Hi Matthias,
> >
> > The motivation for KIP-63 was primarily aggregates and reducing the load
> on "both" state stores and downstream. I think there is agreement that for
> the DSL the motivation and design make sense.
> >
> > For the Processor API: caching is a major component in any system, and
> it is difficult to continue to operate as before, without fully
> understanding the consequences. Hence, I think this is mostly a case of
> educating users to understand the boundaries of the solution.
> >
> > Introducing a cache, either for the state store only, or for downstream
> forwarding only, or for both, leads to moving from a model where we process
> each request end-to-end (today) to one where a request is temporarily
> buffered in a cache. In all the cases, this opens up the question of what
> to do next once the request then leaves the cache, and how to express that
> (future) behaviour. E.g., even when the cache is just for downstream
> forwarding (i.e., decoupled from any state store), the processor API user
> might be surprised that context.forward() does not immediately do anything.
> >
> > I agree that for ultra-flexibility, a processor API user should be able
> to choose whether the dedup cache is put 1) on top of a store only, 2) on
> forward only, 3) on both store and forward, but given the motivation for
> KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a
> reasonable choice that provides good default behaviour, without prodding
> the user to specify the combinations.
> >
> > We need to educate users that if a cache is used in the Processor API,
> the forwarding will happen in the future.
> >
> > -Eno
> >
> >
> >
> >> On 4 Sep 2016, at 19:11, Matthias J. Sax <ma...@confluent.io> wrote:
> >>
> >>> Processor code should always work; independently if caching is enabled
> >> or not.
> >>
> >> If we want to get this, I guess we need a quite different design (see
> (1)).
> >>
> >> The point is, that we want to dedup the output, and not state updates.
> >>
> >> It just happens that our starting point was KTable, for which state
> >> updates and downstream changelog output is the same thing. Thus, we can
> >> just use the internal KTable state to do the deduplication for the
> >> downstream changelog.
> >>
> >> However, from a general point of view (Processor API view), if we dedup
> >> the output, we want dedup/caching for the processor (and not for a state
> >> store). Of course, we need a state to do the dedup. For KTable, both
> >> things merge into a single abstraction, and we use only a single state
> >> instead of two. From a general point of view, we would need two states
> >> though (one for the actual state, and one for dedup -- think Processor
> >> API -- not DSL).
> >>
> >>
> >> Alternative proposal 1:
> >> (see also (2) -- which might be better than this one)
> >>
> >> Thus, it might be a cleaner design to decouple user-states and
> >> dedup-state from each other. If a user enables dedup/caching (for a
> >> processor) we add an additional state to do the dedup and this
> >> dedup-state is independent from all user states and context.forward()
> >> works as always. The dedup state could be hidden from the user and could
> >> be a pure in-memory state (no need for any recovery -- only flush on
> >> commit). Internally, a context.forward() would call dedupState.put() and
> >> trigger actual output if dedup state needs to evict records.
> >>
> >> The disadvantage would be, that we end up with two states for KTable.
> >> The advantage is, that deduplication can be switched off/on without any
> >> Processor code change.
> >>
> >>
> >> Alternative proposal 2:
> >>
> >> We basically keep the current KIP design, including not to disable
> >> context.forward() if a cached state is used. Additionally, for cached
> >> state, we rename put() into putAndForward() which is only available for
> >> cached states. Thus, in processor code, a state must be explicitly cast
> >> into a cached state. We also make the user aware, that an update/put to
> >> a state result in downstream output and that context.forward() would be
> >> a "direct/non-cached" output.
> >>
> >> The disadvantage of this is, that processor code is not independent from
> >> caching and thus, caching cannot just be switched on/off (ie, we do not
> >> follow the initial statement of this mail). The advantage is, we can
> >> keep a single state for KTable and this design is just small changes to
> >> the current KIP.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
> >>> Sure, you can use a non-cached state. However, if you write code like
> >>> below for a non-cached state, and learn about caching later on, and
> >>> think, caching is a cool feature, I want to use it, you would simply
> >>> want to enable caching (without breaking your code).
> >>>
> >>> Processor code should always work independently if caching is enabled
> or
> >>> not.
> >>>
> >>> -Matthias
> >>>
> >>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for the good questions.
> >>>>
> >>>> There is still the option of not using cached state. If one uses
> cached state it will dedup for stores and forwarding further. But you can
> always disable caching and do what you say.
> >>>>
> >>>> Eno
> >>>>
> >>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io>
> wrote:
> >>>>>
> >>>>> Sorry for not being precise. What I meant be "completely" is for a
> >>>>> single processor. Assume I want to have the following pattern:
> >>>>>
> >>>>> process(...) {
> >>>>>  if (someCondition) {
> >>>>>    state.put(...)
> >>>>>    context.forward(...);
> >>>>>  } else {
> >>>>>    context.forward(...);
> >>>>> }
> >>>>>
> >>>>> Ie, for some record I do update the state and emit output records,
> for
> >>>>> other records I only emit output records. This work in current
> design.
> >>>>> However, if a "cached state" would be used, it would not work any
> more.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
> >>>>>> Hi Matthias,
> >>>>>>
> >>>>>> Thanks for bringing the conversation across to the thread.
> >>>>>>
> >>>>>> I think a main limitation would be, that you cannot mix the 4
> patterns
> >>>>>>> within a single application anymore (iff you use a "caches
> state"). If
> >>>>>>> you have processor with a "cached state" this disables direct
> usage of
> >>>>>>> context.forward() completely -- if I understand the design
> correctly.
> >>>>>>> Thus, if a "cached state" is used, forwarding is only possible via
> state
> >>>>>>> updates.
> >>>>>>>
> >>>>>>>
> >>>>>> The above statement is not correct. Caching doesn't completely
> disable
> >>>>>> forwarding, it only disables it for Processors that are using State
> Stores.
> >>>>>> In all other cases context.forward() works as it does now.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Damian
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Eno Thereska <en...@gmail.com>.
A small update to the KIP: the deduping of records using the cache does not affect the .to operator since we'd have already deduped the KTable before the operator. Adjusting KIP.

Thanks
Eno

> On 5 Sep 2016, at 12:43, Eno Thereska <en...@gmail.com> wrote:
> 
> Hi Matthias,
> 
> The motivation for KIP-63 was primarily aggregates and reducing the load on "both" state stores and downstream. I think there is agreement that for the DSL the motivation and design make sense.
> 
> For the Processor API: caching is a major component in any system, and it is difficult to continue to operate as before, without fully understanding the consequences. Hence, I think this is mostly a case of educating users to understand the boundaries of the solution. 
> 
> Introducing a cache, either for the state store only, or for downstream forwarding only, or for both, leads to moving from a model where we process each request end-to-end (today) to one where a request is temporarily buffered in a cache. In all the cases, this opens up the question of what to do next once the request then leaves the cache, and how to express that (future) behaviour. E.g., even when the cache is just for downstream forwarding (i.e., decoupled from any state store), the processor API user might be surprised that context.forward() does not immediately do anything.
> 
> I agree that for ultra-flexibility, a processor API user should be able to choose whether the dedup cache is put 1) on top of a store only, 2) on forward only, 3) on both store and forward, but given the motivation for KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a reasonable choice that provides good default behaviour, without prodding the user to specify the combinations.
> 
> We need to educate users that if a cache is used in the Processor API, the forwarding will happen in the future. 
> 
> -Eno
> 
> 
> 
>> On 4 Sep 2016, at 19:11, Matthias J. Sax <ma...@confluent.io> wrote:
>> 
>>> Processor code should always work; independently if caching is enabled
>> or not.
>> 
>> If we want to get this, I guess we need a quite different design (see (1)).
>> 
>> The point is, that we want to dedup the output, and not state updates.
>> 
>> It just happens that our starting point was KTable, for which state
>> updates and downstream changelog output is the same thing. Thus, we can
>> just use the internal KTable state to do the deduplication for the
>> downstream changelog.
>> 
>> However, from a general point of view (Processor API view), if we dedup
>> the output, we want dedup/caching for the processor (and not for a state
>> store). Of course, we need a state to do the dedup. For KTable, both
>> things merge into a single abstraction, and we use only a single state
>> instead of two. From a general point of view, we would need two states
>> though (one for the actual state, and one for dedup -- think Processor
>> API -- not DSL).
>> 
>> 
>> Alternative proposal 1:
>> (see also (2) -- which might be better than this one)
>> 
>> Thus, it might be a cleaner design to decouple user-states and
>> dedup-state from each other. If a user enables dedup/caching (for a
>> processor) we add an additional state to do the dedup and this
>> dedup-state is independent from all user states and context.forward()
>> works as always. The dedup state could be hidden from the user and could
>> be a pure in-memory state (no need for any recovery -- only flush on
>> commit). Internally, a context.forward() would call dedupState.put() and
>> trigger actual output if dedup state needs to evict records.
>> 
>> The disadvantage would be, that we end up with two states for KTable.
>> The advantage is, that deduplication can be switched off/on without any
>> Processor code change.
>> 
>> 
>> Alternative proposal 2:
>> 
>> We basically keep the current KIP design, including not to disable
>> context.forward() if a cached state is used. Additionally, for cached
>> state, we rename put() into putAndForward() which is only available for
>> cached states. Thus, in processor code, a state must be explicitly cast
>> into a cached state. We also make the user aware, that an update/put to
>> a state result in downstream output and that context.forward() would be
>> a "direct/non-cached" output.
>> 
>> The disadvantage of this is, that processor code is not independent from
>> caching and thus, caching cannot just be switched on/off (ie, we do not
>> follow the initial statement of this mail). The advantage is, we can
>> keep a single state for KTable and this design is just small changes to
>> the current KIP.
>> 
>> 
>> 
>> -Matthias
>> 
>> 
>> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>>> Sure, you can use a non-cached state. However, if you write code like
>>> below for a non-cached state, and learn about caching later on, and
>>> think, caching is a cool feature, I want to use it, you would simply
>>> want to enable caching (without breaking your code).
>>> 
>>> Processor code should always work independently if caching is enabled or
>>> not.
>>> 
>>> -Matthias
>>> 
>>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>>> Hi Matthias,
>>>> 
>>>> Thanks for the good questions. 
>>>> 
>>>> There is still the option of not using cached state. If one uses cached state it will dedup for stores and forwarding further. But you can always disable caching and do what you say.
>>>> 
>>>> Eno
>>>> 
>>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io> wrote:
>>>>> 
>>>>> Sorry for not being precise. What I meant be "completely" is for a
>>>>> single processor. Assume I want to have the following pattern:
>>>>> 
>>>>> process(...) {
>>>>>  if (someCondition) {
>>>>>    state.put(...)
>>>>>    context.forward(...);
>>>>>  } else {
>>>>>    context.forward(...);
>>>>> }
>>>>> 
>>>>> Ie, for some record I do update the state and emit output records, for
>>>>> other records I only emit output records. This work in current design.
>>>>> However, if a "cached state" would be used, it would not work any more.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>>>> Hi Matthias,
>>>>>> 
>>>>>> Thanks for bringing the conversation across to the thread.
>>>>>> 
>>>>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>>>>> within a single application anymore (iff you use a "caches state"). If
>>>>>>> you have processor with a "cached state" this disables direct usage of
>>>>>>> context.forward() completely -- if I understand the design correctly.
>>>>>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>>>>>> updates.
>>>>>>> 
>>>>>>> 
>>>>>> The above statement is not correct. Caching doesn't completely disable
>>>>>> forwarding, it only disables it for Processors that are using State Stores.
>>>>>> In all other cases context.forward() works as it does now.
>>>>>> 
>>>>>> Thanks,
>>>>>> Damian
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Matthias,

The motivation for KIP-63 was primarily aggregates and reducing the load on "both" state stores and downstream. I think there is agreement that for the DSL the motivation and design make sense.

For the Processor API: caching is a major component in any system, and it is difficult to continue to operate as before, without fully understanding the consequences. Hence, I think this is mostly a case of educating users to understand the boundaries of the solution. 

Introducing a cache, either for the state store only, or for downstream forwarding only, or for both, leads to moving from a model where we process each request end-to-end (today) to one where a request is temporarily buffered in a cache. In all the cases, this opens up the question of what to do next once the request then leaves the cache, and how to express that (future) behaviour. E.g., even when the cache is just for downstream forwarding (i.e., decoupled from any state store), the processor API user might be surprised that context.forward() does not immediately do anything.

I agree that for ultra-flexibility, a processor API user should be able to choose whether the dedup cache is put 1) on top of a store only, 2) on forward only, 3) on both store and forward, but given the motivation for KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a reasonable choice that provides good default behaviour, without prodding the user to specify the combinations.

We need to educate users that if a cache is used in the Processor API, the forwarding will happen in the future. 

-Eno



> On 4 Sep 2016, at 19:11, Matthias J. Sax <ma...@confluent.io> wrote:
> 
>> Processor code should always work; independently if caching is enabled
> or not.
> 
> If we want to get this, I guess we need a quite different design (see (1)).
> 
> The point is, that we want to dedup the output, and not state updates.
> 
> It just happens that our starting point was KTable, for which state
> updates and downstream changelog output is the same thing. Thus, we can
> just use the internal KTable state to do the deduplication for the
> downstream changelog.
> 
> However, from a general point of view (Processor API view), if we dedup
> the output, we want dedup/caching for the processor (and not for a state
> store). Of course, we need a state to do the dedup. For KTable, both
> things merge into a single abstraction, and we use only a single state
> instead of two. From a general point of view, we would need two states
> though (one for the actual state, and one for dedup -- think Processor
> API -- not DSL).
> 
> 
> Alternative proposal 1:
> (see also (2) -- which might be better than this one)
> 
> Thus, it might be a cleaner design to decouple user-states and
> dedup-state from each other. If a user enables dedup/caching (for a
> processor) we add an additional state to do the dedup and this
> dedup-state is independent from all user states and context.forward()
> works as always. The dedup state could be hidden from the user and could
> be a pure in-memory state (no need for any recovery -- only flush on
> commit). Internally, a context.forward() would call dedupState.put() and
> trigger actual output if dedup state needs to evict records.
> 
> The disadvantage would be, that we end up with two states for KTable.
> The advantage is, that deduplication can be switched off/on without any
> Processor code change.
> 
> 
> Alternative proposal 2:
> 
> We basically keep the current KIP design, including not to disable
> context.forward() if a cached state is used. Additionally, for cached
> state, we rename put() into putAndForward() which is only available for
> cached states. Thus, in processor code, a state must be explicitly cast
> into a cached state. We also make the user aware, that an update/put to
> a state result in downstream output and that context.forward() would be
> a "direct/non-cached" output.
> 
> The disadvantage of this is, that processor code is not independent from
> caching and thus, caching cannot just be switched on/off (ie, we do not
> follow the initial statement of this mail). The advantage is, we can
> keep a single state for KTable and this design is just small changes to
> the current KIP.
> 
> 
> 
> -Matthias
> 
> 
> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>> Sure, you can use a non-cached state. However, if you write code like
>> below for a non-cached state, and learn about caching later on, and
>> think, caching is a cool feature, I want to use it, you would simply
>> want to enable caching (without breaking your code).
>> 
>> Processor code should always work independently if caching is enabled or
>> not.
>> 
>> -Matthias
>> 
>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>> Hi Matthias,
>>> 
>>> Thanks for the good questions. 
>>> 
>>> There is still the option of not using cached state. If one uses cached state it will dedup for stores and forwarding further. But you can always disable caching and do what you say.
>>> 
>>> Eno
>>> 
>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io> wrote:
>>>> 
>>>> Sorry for not being precise. What I meant be "completely" is for a
>>>> single processor. Assume I want to have the following pattern:
>>>> 
>>>> process(...) {
>>>>   if (someCondition) {
>>>>     state.put(...)
>>>>     context.forward(...);
>>>>   } else {
>>>>     context.forward(...);
>>>> }
>>>> 
>>>> Ie, for some record I do update the state and emit output records, for
>>>> other records I only emit output records. This work in current design.
>>>> However, if a "cached state" would be used, it would not work any more.
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>>> Hi Matthias,
>>>>> 
>>>>> Thanks for bringing the conversation across to the thread.
>>>>> 
>>>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>>>> within a single application anymore (iff you use a "caches state"). If
>>>>>> you have processor with a "cached state" this disables direct usage of
>>>>>> context.forward() completely -- if I understand the design correctly.
>>>>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>>>>> updates.
>>>>>> 
>>>>>> 
>>>>> The above statement is not correct. Caching doesn't completely disable
>>>>> forwarding, it only disables it for Processors that are using State Stores.
>>>>> In all other cases context.forward() works as it does now.
>>>>> 
>>>>> Thanks,
>>>>> Damian
>>>>> 
>>>> 
>>> 
>> 
> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
> Processor code should always work; independently if caching is enabled
or not.

If we want to get this, I guess we need a quite different design (see (1)).

The point is, that we want to dedup the output, and not state updates.

It just happens that our starting point was KTable, for which state
updates and downstream changelog output is the same thing. Thus, we can
just use the internal KTable state to do the deduplication for the
downstream changelog.

However, from a general point of view (Processor API view), if we dedup
the output, we want dedup/caching for the processor (and not for a state
store). Of course, we need a state to do the dedup. For KTable, both
things merge into a single abstraction, and we use only a single state
instead of two. From a general point of view, we would need two states
though (one for the actual state, and one for dedup -- think Processor
API -- not DSL).


Alternative proposal 1:
(see also (2) -- which might be better than this one)

Thus, it might be a cleaner design to decouple user-states and
dedup-state from each other. If a user enables dedup/caching (for a
processor) we add an additional state to do the dedup and this
dedup-state is independent from all user states and context.forward()
works as always. The dedup state could be hidden from the user and could
be a pure in-memory state (no need for any recovery -- only flush on
commit). Internally, a context.forward() would call dedupState.put() and
trigger actual output if dedup state needs to evict records.

The disadvantage would be, that we end up with two states for KTable.
The advantage is, that deduplication can be switched off/on without any
Processor code change.


Alternative proposal 2:

We basically keep the current KIP design, including not to disable
context.forward() if a cached state is used. Additionally, for cached
state, we rename put() into putAndForward() which is only available for
cached states. Thus, in processor code, a state must be explicitly cast
into a cached state. We also make the user aware, that an update/put to
a state result in downstream output and that context.forward() would be
a "direct/non-cached" output.

The disadvantage of this is, that processor code is not independent from
caching and thus, caching cannot just be switched on/off (ie, we do not
follow the initial statement of this mail). The advantage is, we can
keep a single state for KTable and this design is just small changes to
the current KIP.



-Matthias


On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
> Sure, you can use a non-cached state. However, if you write code like
> below for a non-cached state, and learn about caching later on, and
> think, caching is a cool feature, I want to use it, you would simply
> want to enable caching (without breaking your code).
> 
> Processor code should always work independently if caching is enabled or
> not.
> 
> -Matthias
> 
> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>> Hi Matthias,
>>
>> Thanks for the good questions. 
>>
>> There is still the option of not using cached state. If one uses cached state it will dedup for stores and forwarding further. But you can always disable caching and do what you say.
>>
>> Eno
>>
>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io> wrote:
>>>
>>> Sorry for not being precise. What I meant be "completely" is for a
>>> single processor. Assume I want to have the following pattern:
>>>
>>>  process(...) {
>>>    if (someCondition) {
>>>      state.put(...)
>>>      context.forward(...);
>>>    } else {
>>>      context.forward(...);
>>>  }
>>>
>>> Ie, for some record I do update the state and emit output records, for
>>> other records I only emit output records. This work in current design.
>>> However, if a "cached state" would be used, it would not work any more.
>>>
>>>
>>> -Matthias
>>>
>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>> Hi Matthias,
>>>>
>>>> Thanks for bringing the conversation across to the thread.
>>>>
>>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>>> within a single application anymore (iff you use a "caches state"). If
>>>>> you have processor with a "cached state" this disables direct usage of
>>>>> context.forward() completely -- if I understand the design correctly.
>>>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>>>> updates.
>>>>>
>>>>>
>>>> The above statement is not correct. Caching doesn't completely disable
>>>> forwarding, it only disables it for Processors that are using State Stores.
>>>> In all other cases context.forward() works as it does now.
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>
>>
> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sure, you can use a non-cached state. However, if you write code like
below for a non-cached state, and learn about caching later on, and
think, caching is a cool feature, I want to use it, you would simply
want to enable caching (without breaking your code).

Processor code should always work independently if caching is enabled or
not.

-Matthias

On 09/04/2016 06:56 PM, Eno Thereska wrote:
> Hi Matthias,
> 
> Thanks for the good questions. 
> 
> There is still the option of not using cached state. If one uses cached state it will dedup for stores and forwarding further. But you can always disable caching and do what you say.
> 
> Eno
> 
>> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io> wrote:
>>
>> Sorry for not being precise. What I meant be "completely" is for a
>> single processor. Assume I want to have the following pattern:
>>
>>  process(...) {
>>    if (someCondition) {
>>      state.put(...)
>>      context.forward(...);
>>    } else {
>>      context.forward(...);
>>  }
>>
>> Ie, for some record I do update the state and emit output records, for
>> other records I only emit output records. This work in current design.
>> However, if a "cached state" would be used, it would not work any more.
>>
>>
>> -Matthias
>>
>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>> Hi Matthias,
>>>
>>> Thanks for bringing the conversation across to the thread.
>>>
>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>> within a single application anymore (iff you use a "caches state"). If
>>>> you have processor with a "cached state" this disables direct usage of
>>>> context.forward() completely -- if I understand the design correctly.
>>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>>> updates.
>>>>
>>>>
>>> The above statement is not correct. Caching doesn't completely disable
>>> forwarding, it only disables it for Processors that are using State Stores.
>>> In all other cases context.forward() works as it does now.
>>>
>>> Thanks,
>>> Damian
>>>
>>
> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Matthias,

Thanks for the good questions. 

There is still the option of not using cached state. If one uses cached state it will dedup for stores and forwarding further. But you can always disable caching and do what you say.

Eno

> On 4 Sep 2016, at 17:36, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> Sorry for not being precise. What I meant be "completely" is for a
> single processor. Assume I want to have the following pattern:
> 
>  process(...) {
>    if (someCondition) {
>      state.put(...)
>      context.forward(...);
>    } else {
>      context.forward(...);
>  }
> 
> Ie, for some record I do update the state and emit output records, for
> other records I only emit output records. This work in current design.
> However, if a "cached state" would be used, it would not work any more.
> 
> 
> -Matthias
> 
> On 09/04/2016 05:58 PM, Damian Guy wrote:
>> Hi Matthias,
>> 
>> Thanks for bringing the conversation across to the thread.
>> 
>> I think a main limitation would be, that you cannot mix the 4 patterns
>>> within a single application anymore (iff you use a "caches state"). If
>>> you have processor with a "cached state" this disables direct usage of
>>> context.forward() completely -- if I understand the design correctly.
>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>> updates.
>>> 
>>> 
>> The above statement is not correct. Caching doesn't completely disable
>> forwarding, it only disables it for Processors that are using State Stores.
>> In all other cases context.forward() works as it does now.
>> 
>> Thanks,
>> Damian
>> 
> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Damian Guy <da...@gmail.com>.
Thanks for clarifying

On 4 September 2016 at 17:36, Matthias J. Sax <ma...@confluent.io> wrote:

> Sorry for not being precise. What I meant be "completely" is for a
> single processor. Assume I want to have the following pattern:
>
>   process(...) {
>     if (someCondition) {
>       state.put(...)
>       context.forward(...);
>     } else {
>       context.forward(...);
>   }
>
> Ie, for some record I do update the state and emit output records, for
> other records I only emit output records. This work in current design.
> However, if a "cached state" would be used, it would not work any more.
>
>
> -Matthias
>
> On 09/04/2016 05:58 PM, Damian Guy wrote:
> > Hi Matthias,
> >
> > Thanks for bringing the conversation across to the thread.
> >
> > I think a main limitation would be, that you cannot mix the 4 patterns
> >> within a single application anymore (iff you use a "caches state"). If
> >> you have processor with a "cached state" this disables direct usage of
> >> context.forward() completely -- if I understand the design correctly.
> >> Thus, if a "cached state" is used, forwarding is only possible via state
> >> updates.
> >>
> >>
> > The above statement is not correct. Caching doesn't completely disable
> > forwarding, it only disables it for Processors that are using State
> Stores.
> > In all other cases context.forward() works as it does now.
> >
> > Thanks,
> > Damian
> >
>
>

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sorry for not being precise. What I meant be "completely" is for a
single processor. Assume I want to have the following pattern:

  process(...) {
    if (someCondition) {
      state.put(...)
      context.forward(...);
    } else {
      context.forward(...);
  }

Ie, for some record I do update the state and emit output records, for
other records I only emit output records. This work in current design.
However, if a "cached state" would be used, it would not work any more.


-Matthias

On 09/04/2016 05:58 PM, Damian Guy wrote:
> Hi Matthias,
> 
> Thanks for bringing the conversation across to the thread.
> 
> I think a main limitation would be, that you cannot mix the 4 patterns
>> within a single application anymore (iff you use a "caches state"). If
>> you have processor with a "cached state" this disables direct usage of
>> context.forward() completely -- if I understand the design correctly.
>> Thus, if a "cached state" is used, forwarding is only possible via state
>> updates.
>>
>>
> The above statement is not correct. Caching doesn't completely disable
> forwarding, it only disables it for Processors that are using State Stores.
> In all other cases context.forward() works as it does now.
> 
> Thanks,
> Damian
> 


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by Damian Guy <da...@gmail.com>.
Hi Matthias,

Thanks for bringing the conversation across to the thread.

I think a main limitation would be, that you cannot mix the 4 patterns
> within a single application anymore (iff you use a "caches state"). If
> you have processor with a "cached state" this disables direct usage of
> context.forward() completely -- if I understand the design correctly.
> Thus, if a "cached state" is used, forwarding is only possible via state
> updates.
>
>
The above statement is not correct. Caching doesn't completely disable
forwarding, it only disables it for Processors that are using State Stores.
In all other cases context.forward() works as it does now.

Thanks,
Damian

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I think a main limitation would be, that you cannot mix the 4 patterns
within a single application anymore (iff you use a "caches state"). If
you have processor with a "cached state" this disables direct usage of
context.forward() completely -- if I understand the design correctly.
Thus, if a "cached state" is used, forwarding is only possible via state
updates.

The above described approach is fine from DSL point of view. The main
question is, if a "cached state" should be a DSL internal implementation
detail or should be exposed to the user for Processor API reuse. For the
former, the design is fine; for the latter, IMHO it puts a limitation
and hard to understand usage pattern for a regular user of Processor API.

-Matthias


On 09/04/2016 05:28 PM, Matthias J. Sax wrote:
> We had a recent discussion about KIP-63, and I just c&p from the JIRA
> discussion:
> 
> Damian:
>> During the code walk-through, Matthias raised a very good point about the use of context().forward being coupled to whether or not caching is enabled. Now that i've had the chance to think about it I have one potential solution for making this transparent to uses of the Processor API.
>>
>> We can add another method boolean isCachingEnabled() to the new interface ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode:
>> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled()
>>
>> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener to attach to the ForwardingStateStoreSupplier we can call ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled())
>>
>> We add an extra boolean parameter to the ProcessorRecordContextImpl forward this will be set to false when constructed from StreamTask and will be set to true when constructed from ProcessorNodeCacheFlushListener. Then in ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) where shouldForward is return forward || !node.stateStoreCachingEnabled();
>>
>> Now Processors are free to call context().forward(..) whether caching is enabled or not. If it is enabled the values just wont get forwarded until the cache evicts/flushes them.
> 
> 
> Matthias:
>> I guess this is a good solution/workaround. I had something like this in my mind during the call, too.
>>
>> However, thinking about the root cause of this issue again, I am not sure if the (overall) design of this KIP is optimal or not. My new concern is, that with this caching strategy, we "merge" two concepts into one; and I am not sure, if we should to this.
>>
>> Currently, message flow and state is decoupled and independent of each other. Thus, if there is a state, updates to the state are completely independent from emitting output records. With the new design, we merge state updates and record emits, limiting the overall flexibility. I guess, from a DSL point of view, this would not be problematic, because in an aggregation and changelog output, each update to the state should result in a downstream record. However, from a Processor API point of view, there are other patterns we want to be able to support, too.
>>
>> Basically, for each input record, there a four different patterns that could be applied by the user:
>>
>>     no state updates, no output records
>>     only state update
>>     only output records
>>     state updates and output records
>>
>> Right now, we go with a design that allows to use one of the patterns within a Processor. However, all 4 pattern could be mixed within a single Processor (pre KIP design), and this mixture would not be possible any more. If we want to support all four cases, we might not want to merge both into "a single abstraction" as we do in the design of this PR. What if a user just wants to sent a record downstream (without any state manipulation)?
>>
>> Going back to the KIP design, we move the cache from RocksDB into the processor. However, what we actually wanted to do was to de-duplicate output records. Thus, the newly introduced cache, could actually go "after the processor" and could be completely independent from the state. Thus, on each call to forward() the record is put into the cache, and if the cache is full, an actual cache eviction and record forwarding happens. This would make the de-duplication cache independent from the state.
> 
> 
> Eno:
>> it's not entirely true that the flexibility is limited. For example, what's next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 where we add the dedup cache to the to operator. That is not implemented yet.
> 
> 
> Damian:
>> i think of the 4 patterns you mentioned only the last one changes, i.e, state updates and output records.
>> context.forward() still exists so you can just send a record downstream without any state manipulation, that behaviour hasn't changed.
> 
> 
> 
> 
> 
> 
> On 08/24/2016 03:35 PM, Eno Thereska wrote:
>> Hi folks,
>>
>> We've been working on a proof-of-concept for KIP-63 and that can now be
>> found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776)
>> under PR https://github.com/apache/kafka/pull/1752. It is still work in
>> progress, however we are confident that the basic structure is there.
>>
>> As part of this work, we've also updated the KIP to clarify several things,
>> listed here for convenience:
>>
>> - Clarify that the optimization is applicable to aggregations and to
>> operators. It is not applicable to joins.
>> - Clarify that for the low-level Processor API, we propose to allow users
>> for disabling caching on a store-by-store basis using a new
>> .enableCaching() call.
>>
>> We'll start the voting process shortly for this KIP.
>>
>> Thanks
>> Eno
>>
>>
>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <en...@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> I have created KIP-63: Unify store and downstream caching in streams
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 63%3A+Unify+store+and+downstream+caching+in+streams
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>
>>>
>>>
>>> Feedback is appreciated.
>>>
>>> Thank you
>>> Eno
>>>
>>
>