You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2016/12/01 09:19:31 UTC

Is there a way to control pipeline flow to downstream

Hi all,
Say I have a pipleline like this

topic.aggregateByKey( ...) => to downstream

Now for every message in topic it will call aggregateByKey and send it to
downstream

Is there a way to tell the pipeline that if it gets a certain message then
only push the current aggregation result to downstream.

Or I can do some configuration like until it has aggregated the result of n
messages don't push it to downstream.

Or any such logic can only be built in the downstream to check and decide
if it needs to process the current aggregation or not.

Thanks
Sachin

Re: Is there a way to control pipeline flow to downstream

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
Thanks for the link.

What I understand is that when cache.max.bytes.buffering value is reached
it will push the aggregation to downstream.
What is the default value for the same?
And how can I determine my cache size for current stream so as to set an
optimal value.

I also suppose the push to downstream based on number of messages
aggregated or time elapsed is something of future work planned and not
available in the master branch right now?
I suppose this part is of more of interest to us.

Thanks
Sachin




On Thu, Dec 1, 2016 at 3:43 PM, Eno Thereska <en...@gmail.com> wrote:

> Hi Sachin,
>
> This landed in 0.10.1, so the docs are at http://kafka.apache.org/0101/
> javadoc/index.html <http://kafka.apache.org/0101/javadoc/index.html>.
>
> This wiki has a good description of how this works:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 63%3A+Unify+store+and+downstream+caching+in+streams <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+
> downstream+caching+in+streams>
>
> Eno
>
> > On 1 Dec 2016, at 10:07, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi,
> > I checked the docs
> > http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but
> did
> > not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.
> >
> > Also on the first option:
> > use the record cache to dedup messages with the same key before sending
> > downstream
> >
> > I did not understand this. How does one implement this option.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska <en...@gmail.com>
> wrote:
> >
> >> Hi Sachin,
> >>
> >> If you are using the DSL, currently there is no way to do fine-grained
> >> control of the downstream sending. There is some coarse-grained control
> in
> >> that you can use the record cache to dedup messages with the same key
> >> before sending downstream, or you can choose to get all records by
> setting
> >> the cache to 0:
> >> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> >> 0);
> >>
> >> So it looks like you might want to build such logic downstream.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 1 Dec 2016, at 09:19, Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>> Hi all,
> >>> Say I have a pipleline like this
> >>>
> >>> topic.aggregateByKey( ...) => to downstream
> >>>
> >>> Now for every message in topic it will call aggregateByKey and send it
> to
> >>> downstream
> >>>
> >>> Is there a way to tell the pipeline that if it gets a certain message
> >> then
> >>> only push the current aggregation result to downstream.
> >>>
> >>> Or I can do some configuration like until it has aggregated the result
> >> of n
> >>> messages don't push it to downstream.
> >>>
> >>> Or any such logic can only be built in the downstream to check and
> decide
> >>> if it needs to process the current aggregation or not.
> >>>
> >>> Thanks
> >>> Sachin
> >>
> >>
>
>

Re: Is there a way to control pipeline flow to downstream

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

This landed in 0.10.1, so the docs are at http://kafka.apache.org/0101/javadoc/index.html <http://kafka.apache.org/0101/javadoc/index.html>. 

This wiki has a good description of how this works: https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>

Eno

> On 1 Dec 2016, at 10:07, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi,
> I checked the docs
> http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but did
> not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.
> 
> Also on the first option:
> use the record cache to dedup messages with the same key before sending
> downstream
> 
> I did not understand this. How does one implement this option.
> 
> Thanks
> Sachin
> 
> 
> On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska <en...@gmail.com> wrote:
> 
>> Hi Sachin,
>> 
>> If you are using the DSL, currently there is no way to do fine-grained
>> control of the downstream sending. There is some coarse-grained control in
>> that you can use the record cache to dedup messages with the same key
>> before sending downstream, or you can choose to get all records by setting
>> the cache to 0:
>> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>> 0);
>> 
>> So it looks like you might want to build such logic downstream.
>> 
>> Thanks
>> Eno
>> 
>>> On 1 Dec 2016, at 09:19, Sachin Mittal <sj...@gmail.com> wrote:
>>> 
>>> Hi all,
>>> Say I have a pipleline like this
>>> 
>>> topic.aggregateByKey( ...) => to downstream
>>> 
>>> Now for every message in topic it will call aggregateByKey and send it to
>>> downstream
>>> 
>>> Is there a way to tell the pipeline that if it gets a certain message
>> then
>>> only push the current aggregation result to downstream.
>>> 
>>> Or I can do some configuration like until it has aggregated the result
>> of n
>>> messages don't push it to downstream.
>>> 
>>> Or any such logic can only be built in the downstream to check and decide
>>> if it needs to process the current aggregation or not.
>>> 
>>> Thanks
>>> Sachin
>> 
>> 


Re: Is there a way to control pipeline flow to downstream

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I checked the docs
http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but did
not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.

Also on the first option:
use the record cache to dedup messages with the same key before sending
downstream

I did not understand this. How does one implement this option.

Thanks
Sachin


On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska <en...@gmail.com> wrote:

> Hi Sachin,
>
> If you are using the DSL, currently there is no way to do fine-grained
> control of the downstream sending. There is some coarse-grained control in
> that you can use the record cache to dedup messages with the same key
> before sending downstream, or you can choose to get all records by setting
> the cache to 0:
> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0);
>
> So it looks like you might want to build such logic downstream.
>
> Thanks
> Eno
>
> > On 1 Dec 2016, at 09:19, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi all,
> > Say I have a pipleline like this
> >
> > topic.aggregateByKey( ...) => to downstream
> >
> > Now for every message in topic it will call aggregateByKey and send it to
> > downstream
> >
> > Is there a way to tell the pipeline that if it gets a certain message
> then
> > only push the current aggregation result to downstream.
> >
> > Or I can do some configuration like until it has aggregated the result
> of n
> > messages don't push it to downstream.
> >
> > Or any such logic can only be built in the downstream to check and decide
> > if it needs to process the current aggregation or not.
> >
> > Thanks
> > Sachin
>
>

Re: Is there a way to control pipeline flow to downstream

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

If you are using the DSL, currently there is no way to do fine-grained control of the downstream sending. There is some coarse-grained control in that you can use the record cache to dedup messages with the same key before sending downstream, or you can choose to get all records by setting the cache to 0:
e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

So it looks like you might want to build such logic downstream.

Thanks
Eno

> On 1 Dec 2016, at 09:19, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi all,
> Say I have a pipleline like this
> 
> topic.aggregateByKey( ...) => to downstream
> 
> Now for every message in topic it will call aggregateByKey and send it to
> downstream
> 
> Is there a way to tell the pipeline that if it gets a certain message then
> only push the current aggregation result to downstream.
> 
> Or I can do some configuration like until it has aggregated the result of n
> messages don't push it to downstream.
> 
> Or any such logic can only be built in the downstream to check and decide
> if it needs to process the current aggregation or not.
> 
> Thanks
> Sachin