You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2018/02/01 02:19:22 UTC

Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

I am writing a processor and I want its stores to behave like KTables: For
consistency, I don't want to forward values until the stores have been
flushed.

I am looking at `ForwardingCacheFlushListener` and see that it is using
`InternalProcessorContext` to change the current node, perform a forward,
and then set the node back.

Now, I could do the same (`InternalProcessorContext` is public), but:
should I? I'm not well versed in the internals, so I am wondering what the
ramifications of this might be. Is this okay to do? Should I?

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

Posted by Dmitry Minkovsky <dm...@gmail.com>.
I was somehow not aware of this:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
... :/

On Thu, Feb 1, 2018 at 11:57 PM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Thank you Guozhang.
>
> > related to your consistency requirement of the store? Do you mean
> flushing the cache to persist into the store or flushing the store
>
> Yes, motivated by consistency, I want to forward the state downstream only
> after LRU cache is persisted into the store on disk, and the store's
> changelog topic has been replicated.
>
> > So if your goal is to achieve de-duping the downstream traffic
>
> I am trying to make sure, to whatever degree is supposed by the library
> now, that downstream processors don't see a message that is the result of
> state that is possibly inconsistent.
>
> Thank you for describing those mechanisms. I will investigate them.
>
> On Thu, Feb 1, 2018 at 1:53 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hello Dmitry,
>>
>> Forwarding to downstream upon state store flushing is an internal
>> implementation detail that is used by the DSL operators only, and hence we
>> define related classes internals to abstract them away from the users.
>>
>> For your case, one thing I need to clarify is for "flushing the store", is
>> how that is related to your consistency requirement of the store? Do you
>> mean flushing the cache to persist into the store or flushing the store
>> (e.g. flush a rocksDB store) itself? Here are some things for you to
>> notice:
>>
>> 1. A state store can be flushed by calling store.flush() programmatically,
>> and if the store is a cached store it will also automatically flush the
>> cache on top of it to make sure all dirty keys are persisted to the
>> underlying storage.
>> 2. A state store will also be flushed whenever the processor topology
>> calls
>> commit(), which can either be user-triggered (context.commit() ) or based
>> on time period (there is a commit interval config).
>>
>> So if your goal is to achieve de-duping the downstream traffic, you can
>> consider using punctuator to periodically flush the store and send the
>> whole key-value map entries to downstream; if your goal is to only send to
>> downstream when the cache is flushed, you can consider overriding the
>> `flush()` function of the state store, that after the flush, send the
>> whole
>> key-value map entries to downstream.
>>
>>
>> Guozhang
>>
>>
>>
>> On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dm...@gmail.com>
>> wrote:
>>
>> > Right, but I want to forward messages to downstream processor nodes only
>> > when the store flushes. How does that happen automatically
>> > when KTableSourceProcessor sets that up to happen with a TupleForwarder?
>> >
>> > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <ma...@confluent.io>
>> > wrote:
>> >
>> > > If you build a store and enable caching, you get the KTable behavior
>> out
>> > > of the box. No need to write any custom code in the processor itself.
>> > >
>> > >
>> > > StoreBuilder builder =
>> > > Stores.keyValueStoreBuilder(...).withCachingEnabled();
>> > >
>> > > topology.addStateStore(builder, ...)
>> > >
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
>> > > > I am writing a processor and I want its stores to behave like
>> KTables:
>> > > For
>> > > > consistency, I don't want to forward values until the stores have
>> been
>> > > > flushed.
>> > > >
>> > > > I am looking at `ForwardingCacheFlushListener` and see that it is
>> using
>> > > > `InternalProcessorContext` to change the current node, perform a
>> > forward,
>> > > > and then set the node back.
>> > > >
>> > > > Now, I could do the same (`InternalProcessorContext` is public),
>> but:
>> > > > should I? I'm not well versed in the internals, so I am wondering
>> what
>> > > the
>> > > > ramifications of this might be. Is this okay to do? Should I?
>> > > >
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Thank you Guozhang.

> related to your consistency requirement of the store? Do you mean
flushing the cache to persist into the store or flushing the store

Yes, motivated by consistency, I want to forward the state downstream only
after LRU cache is persisted into the store on disk, and the store's
changelog topic has been replicated.

> So if your goal is to achieve de-duping the downstream traffic

I am trying to make sure, to whatever degree is supposed by the library
now, that downstream processors don't see a message that is the result of
state that is possibly inconsistent.

Thank you for describing those mechanisms. I will investigate them.

On Thu, Feb 1, 2018 at 1:53 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Dmitry,
>
> Forwarding to downstream upon state store flushing is an internal
> implementation detail that is used by the DSL operators only, and hence we
> define related classes internals to abstract them away from the users.
>
> For your case, one thing I need to clarify is for "flushing the store", is
> how that is related to your consistency requirement of the store? Do you
> mean flushing the cache to persist into the store or flushing the store
> (e.g. flush a rocksDB store) itself? Here are some things for you to
> notice:
>
> 1. A state store can be flushed by calling store.flush() programmatically,
> and if the store is a cached store it will also automatically flush the
> cache on top of it to make sure all dirty keys are persisted to the
> underlying storage.
> 2. A state store will also be flushed whenever the processor topology calls
> commit(), which can either be user-triggered (context.commit() ) or based
> on time period (there is a commit interval config).
>
> So if your goal is to achieve de-duping the downstream traffic, you can
> consider using punctuator to periodically flush the store and send the
> whole key-value map entries to downstream; if your goal is to only send to
> downstream when the cache is flushed, you can consider overriding the
> `flush()` function of the state store, that after the flush, send the whole
> key-value map entries to downstream.
>
>
> Guozhang
>
>
>
> On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dm...@gmail.com>
> wrote:
>
> > Right, but I want to forward messages to downstream processor nodes only
> > when the store flushes. How does that happen automatically
> > when KTableSourceProcessor sets that up to happen with a TupleForwarder?
> >
> > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > If you build a store and enable caching, you get the KTable behavior
> out
> > > of the box. No need to write any custom code in the processor itself.
> > >
> > >
> > > StoreBuilder builder =
> > > Stores.keyValueStoreBuilder(...).withCachingEnabled();
> > >
> > > topology.addStateStore(builder, ...)
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > > > I am writing a processor and I want its stores to behave like
> KTables:
> > > For
> > > > consistency, I don't want to forward values until the stores have
> been
> > > > flushed.
> > > >
> > > > I am looking at `ForwardingCacheFlushListener` and see that it is
> using
> > > > `InternalProcessorContext` to change the current node, perform a
> > forward,
> > > > and then set the node back.
> > > >
> > > > Now, I could do the same (`InternalProcessorContext` is public), but:
> > > > should I? I'm not well versed in the internals, so I am wondering
> what
> > > the
> > > > ramifications of this might be. Is this okay to do? Should I?
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Dmitry,

Forwarding to downstream upon state store flushing is an internal
implementation detail that is used by the DSL operators only, and hence we
define related classes internals to abstract them away from the users.

For your case, one thing I need to clarify is for "flushing the store", is
how that is related to your consistency requirement of the store? Do you
mean flushing the cache to persist into the store or flushing the store
(e.g. flush a rocksDB store) itself? Here are some things for you to notice:

1. A state store can be flushed by calling store.flush() programmatically,
and if the store is a cached store it will also automatically flush the
cache on top of it to make sure all dirty keys are persisted to the
underlying storage.
2. A state store will also be flushed whenever the processor topology calls
commit(), which can either be user-triggered (context.commit() ) or based
on time period (there is a commit interval config).

So if your goal is to achieve de-duping the downstream traffic, you can
consider using punctuator to periodically flush the store and send the
whole key-value map entries to downstream; if your goal is to only send to
downstream when the cache is flushed, you can consider overriding the
`flush()` function of the state store, that after the flush, send the whole
key-value map entries to downstream.


Guozhang



On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> Right, but I want to forward messages to downstream processor nodes only
> when the store flushes. How does that happen automatically
> when KTableSourceProcessor sets that up to happen with a TupleForwarder?
>
> On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > If you build a store and enable caching, you get the KTable behavior out
> > of the box. No need to write any custom code in the processor itself.
> >
> >
> > StoreBuilder builder =
> > Stores.keyValueStoreBuilder(...).withCachingEnabled();
> >
> > topology.addStateStore(builder, ...)
> >
> >
> >
> > -Matthias
> >
> > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > > I am writing a processor and I want its stores to behave like KTables:
> > For
> > > consistency, I don't want to forward values until the stores have been
> > > flushed.
> > >
> > > I am looking at `ForwardingCacheFlushListener` and see that it is using
> > > `InternalProcessorContext` to change the current node, perform a
> forward,
> > > and then set the node back.
> > >
> > > Now, I could do the same (`InternalProcessorContext` is public), but:
> > > should I? I'm not well versed in the internals, so I am wondering what
> > the
> > > ramifications of this might be. Is this okay to do? Should I?
> > >
> >
> >
>



-- 
-- Guozhang

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Right, but I want to forward messages to downstream processor nodes only
when the store flushes. How does that happen automatically
when KTableSourceProcessor sets that up to happen with a TupleForwarder?

On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> If you build a store and enable caching, you get the KTable behavior out
> of the box. No need to write any custom code in the processor itself.
>
>
> StoreBuilder builder =
> Stores.keyValueStoreBuilder(...).withCachingEnabled();
>
> topology.addStateStore(builder, ...)
>
>
>
> -Matthias
>
> On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > I am writing a processor and I want its stores to behave like KTables:
> For
> > consistency, I don't want to forward values until the stores have been
> > flushed.
> >
> > I am looking at `ForwardingCacheFlushListener` and see that it is using
> > `InternalProcessorContext` to change the current node, perform a forward,
> > and then set the node back.
> >
> > Now, I could do the same (`InternalProcessorContext` is public), but:
> > should I? I'm not well versed in the internals, so I am wondering what
> the
> > ramifications of this might be. Is this okay to do? Should I?
> >
>
>

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
If you build a store and enable caching, you get the KTable behavior out
of the box. No need to write any custom code in the processor itself.


StoreBuilder builder =
Stores.keyValueStoreBuilder(...).withCachingEnabled();

topology.addStateStore(builder, ...)



-Matthias

On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> I am writing a processor and I want its stores to behave like KTables: For
> consistency, I don't want to forward values until the stores have been
> flushed.
> 
> I am looking at `ForwardingCacheFlushListener` and see that it is using
> `InternalProcessorContext` to change the current node, perform a forward,
> and then set the node back.
> 
> Now, I could do the same (`InternalProcessorContext` is public), but:
> should I? I'm not well versed in the internals, so I am wondering what the
> ramifications of this might be. Is this okay to do? Should I?
>