You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2021/09/01 17:43:36 UTC

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Currently the state store cache size default value is 10MB today, which
arguably is rather small. So I'm thinking maybe for this config default to
512MB.

Other than that, LGTM.

On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com> wrote:

> Thanks Guozhang and Sophie.
>
> Yeah a small default value would lower the throughput. I didn't quite
> realise it earlier. It's slightly hard to predict this value so I would
> guess around 1/2 GB to 1 GB? WDYT?
>
> Regarding the renaming of the config and the new metric, sure would include
> it in the KIP.
>
> Lastly, importance would also. be added. I guess Medium should be ok.
>
> Thanks!
> Sagar.
>
>
> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> <so...@confluent.io.invalid> wrote:
>
> > 1) I agree that we should just distribute the bytes evenly, at least for
> > now. It's simpler to understand and
> > we can always change it later, plus it makes sense to keep this aligned
> > with how the cache works today
> >
> > 2) +1 to being conservative in the generous sense, it's just not
> something
> > we can predict with any degree
> > of accuracy and even if we could, the appropriate value is going to
> differ
> > wildly across applications and use
> > cases. We might want to just pick some multiple of the default cache
> size,
> > and maybe do some research on
> > other relevant defaults or sizes (default JVM heap, size of available
> > memory in common hosts eg EC2
> > instances, etc). We don't need to worry as much about erring on the side
> of
> > too big, since other configs like
> > the max.poll.records will help somewhat to keep it from exploding.
> >
> > 4) 100%, I always found the *cache.max.bytes.buffering* config name to be
> > incredibly confusing. Deprecating this in
> > favor of "*statestore.cache.max.bytes*" and aligning it to the new input
> > buffer config sounds good to me to include here.
> >
> > 5) The KIP should list all relevant public-facing changes, including
> > metadata like the config's "Importance". Personally
> > I would recommend Medium, or even High if we're really worried about the
> > default being wrong for a lot of users
> >
> > Thanks for the KIP, besides those few things that Guozhang brought up and
> > the config importance, everything SGTM
> >
> > -Sophie
> >
> > On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > 1) I meant for your proposed solution. I.e. to distribute the
> configured
> > > bytes among threads evenly.
> > >
> > > 2) I was actually thinking about making the default a large enough
> value
> > so
> > > that we would not introduce performance regression: thinking about a
> use
> > > case with many partitions and each record may be large, then
> effectively
> > we
> > > would only start pausing when the total bytes buffered is pretty large.
> > If
> > > we set the default value to small, we would be "more aggressive" on
> > pausing
> > > which may impact throughput.
> > >
> > > 3) Yes exactly, this would naturally be at the "partition-group" class
> > > since that represents the task's all input partitions.
> > >
> > > 4) This is just a bold thought, I'm interested to see other's thoughts.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Aug 23, 2021 at 4:10 AM Sagar <sa...@gmail.com>
> wrote:
> > >
> > > > Thanks Guozhang.
> > > >
> > > > 1) Just for my confirmation, when you say we should proceed with the
> > even
> > > > distribution of bytes, are you referring to the Proposed Solution in
> > the
> > > > KIP or the option you had considered in the JIRA?
> > > > 2) Default value for the config is something that I missed. I agree
> we
> > > > can't have really large values as it might be detrimental to the
> > > > performance. Maybe, as a starting point, we assume that only 1 Stream
> > > Task
> > > > is running so what could be the ideal value in such a scenario?
> > Somewhere
> > > > around 10MB similar to the caching config?
> > > > 3) When you say,  *a task level metric indicating the current totally
> > > > aggregated metrics, * you mean the bytes aggregated at a task level?
> > > > 4) I am ok with the name change, but would like to know others'
> > thoughts.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks Sagar for writing this PR.
> > > > >
> > > > > I think twice about the options that have been proposed in
> > > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel that
> at
> > > the
> > > > > moment it's simpler to just do the even distribution of the
> > configured
> > > > > total bytes. My rationale is that right now we have a static tasks
> ->
> > > > > threads mapping, and hence each partition would only be fetched by
> a
> > > > single
> > > > > thread / consumer at a given time. If in the future we break that
> > > static
> > > > > mapping into dynamic mapping, then we would not be able to do this
> > even
> > > > > distribution. Instead we would have other threads polling from
> > consumer
> > > > > only, and those threads would be responsible for checking the
> config
> > > and
> > > > > pause non-empty partitions if it goes beyond the threshold. But
> since
> > > at
> > > > > that time we would not change the config but just how it would be
> > > > > implemented behind the scenes we would not need another KIP to
> change
> > > it.
> > > > >
> > > > > Some more comments:
> > > > >
> > > > > 1. We need to discuss a bit about the default value of this new
> > config.
> > > > > Personally I think we need to be a bit conservative with large
> values
> > > so
> > > > > that it would not have any perf regression compared with old
> configs
> > > > > especially with large topology and large number of partitions.
> > > > > 2. I looked at the existing metrics, and do not have corresponding
> > > > sensors.
> > > > > How about also adding a task level metric indicating the current
> > > totally
> > > > > aggregated metrics. The reason I do not suggest this metric on the
> > > > > per-thread level is that in the future we may break the static
> > mapping
> > > of
> > > > > tasks -> threads.
> > > > >
> > > > > [optional] As an orthogonal thought, I'm thinking maybe we can
> rename
> > > the
> > > > > other "*cache.max.bytes.buffering*" as "statestore.cache.max.bytes"
> > > (via
> > > > > deprecation of course), piggy-backed in this KIP? Would like to
> hear
> > > > > others' thoughts.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar <sa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I would like to start a discussion on the following KIP:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> > > > > >
> > > > > > Thanks!
> > > > > > Sagar.
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Sagar <sa...@gmail.com>.
Hi Guozhang,

Thanks, I have updated the KIP with the mentioned changes.

Thanks!
Sagar.

On Tue, Sep 21, 2021 at 3:45 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Sagar,
>
> Thanks for the added metrics, about its name, if it is proposed as a
> task-level config, then we do not need to prefix its name as `task-`. But
> on the other hand, it's better to give the full description of the metrics,
> like its type name / tag maps / recording levels etc, an example is here:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB
>
> Guozhang
>
> On Mon, Sep 20, 2021 at 10:04 AM Sagar <sa...@gmail.com> wrote:
>
> > Hi All,
> >
> > Bumping this thread again.
> >
> > Thanks!
> > Sagar.
> >
> > On Sat, Sep 11, 2021 at 2:04 PM Sagar <sa...@gmail.com> wrote:
> >
> > > Hi Mathias,
> > >
> > > I missed out on the metrics part.
> > >
> > > I have added the new metric in the proposed changes section along with
> > the
> > > small re-wording that you talked about.
> > >
> > > Let me know if that makes sense.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > >> Thanks for the KIP.
> > >>
> > >> There was some discussion about adding a metric on the thread, but the
> > >> KIP does not contain anything about it. Did we drop this suggestion or
> > >> was the KIP not updated accordingly?
> > >>
> > >>
> > >> Nit:
> > >>
> > >> > This would be a global config applicable per processing topology
> > >>
> > >> Can we change this to `per Kafka Streams instance.`
> > >>
> > >> Atm, a Stream instance executes a single topology, so it does not make
> > >> any effective difference right now. However, it seems better (more
> > >> logical) to bind the config to the instance (not the topology the
> > >> instance executes).
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 9/2/21 6:08 AM, Sagar wrote:
> > >> > Thanks Guozhang and Luke.
> > >> >
> > >> > I have updated the KIP with all the suggested changes.
> > >> >
> > >> > Do you think we could start voting for this?
> > >> >
> > >> > Thanks!
> > >> > Sagar.
> > >> >
> > >> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:
> > >> >
> > >> >> Thanks for the KIP. Overall LGTM.
> > >> >>
> > >> >> Just one thought, if we "rename" the config directly as mentioned
> in
> > >> the
> > >> >> KIP, would that break existing applications?
> > >> >> Should we deprecate the old one first, and make the old/new names
> > >> co-exist
> > >> >> for some period of time?
> > >> >>
> > >> >> Public Interfaces
> > >> >>
> > >> >>    - Adding a new config *input.buffer.max.bytes *applicable at a
> > >> topology
> > >> >>    level. The importance of this config would be *Medium*.
> > >> >>    - Renaming *cache.max.bytes.buffering* to
> > >> *statestore.cache.max.bytes*.
> > >> >>
> > >> >>
> > >> >>
> > >> >> Thank you.
> > >> >> Luke
> > >> >>
> > >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com>
> > >> wrote:
> > >> >>
> > >> >>> Currently the state store cache size default value is 10MB today,
> > >> which
> > >> >>> arguably is rather small. So I'm thinking maybe for this config
> > >> default
> > >> >> to
> > >> >>> 512MB.
> > >> >>>
> > >> >>> Other than that, LGTM.
> > >> >>>
> > >> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sagarmeansocean@gmail.com
> >
> > >> >> wrote:
> > >> >>>
> > >> >>>> Thanks Guozhang and Sophie.
> > >> >>>>
> > >> >>>> Yeah a small default value would lower the throughput. I didn't
> > quite
> > >> >>>> realise it earlier. It's slightly hard to predict this value so I
> > >> would
> > >> >>>> guess around 1/2 GB to 1 GB? WDYT?
> > >> >>>>
> > >> >>>> Regarding the renaming of the config and the new metric, sure
> would
> > >> >>> include
> > >> >>>> it in the KIP.
> > >> >>>>
> > >> >>>> Lastly, importance would also. be added. I guess Medium should be
> > ok.
> > >> >>>>
> > >> >>>> Thanks!
> > >> >>>> Sagar.
> > >> >>>>
> > >> >>>>
> > >> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> > >> >>>> <so...@confluent.io.invalid> wrote:
> > >> >>>>
> > >> >>>>> 1) I agree that we should just distribute the bytes evenly, at
> > least
> > >> >>> for
> > >> >>>>> now. It's simpler to understand and
> > >> >>>>> we can always change it later, plus it makes sense to keep this
> > >> >> aligned
> > >> >>>>> with how the cache works today
> > >> >>>>>
> > >> >>>>> 2) +1 to being conservative in the generous sense, it's just not
> > >> >>>> something
> > >> >>>>> we can predict with any degree
> > >> >>>>> of accuracy and even if we could, the appropriate value is going
> > to
> > >> >>>> differ
> > >> >>>>> wildly across applications and use
> > >> >>>>> cases. We might want to just pick some multiple of the default
> > cache
> > >> >>>> size,
> > >> >>>>> and maybe do some research on
> > >> >>>>> other relevant defaults or sizes (default JVM heap, size of
> > >> available
> > >> >>>>> memory in common hosts eg EC2
> > >> >>>>> instances, etc). We don't need to worry as much about erring on
> > the
> > >> >>> side
> > >> >>>> of
> > >> >>>>> too big, since other configs like
> > >> >>>>> the max.poll.records will help somewhat to keep it from
> exploding.
> > >> >>>>>
> > >> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config
> > name
> > >> >> to
> > >> >>> be
> > >> >>>>> incredibly confusing. Deprecating this in
> > >> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the
> new
> > >> >>> input
> > >> >>>>> buffer config sounds good to me to include here.
> > >> >>>>>
> > >> >>>>> 5) The KIP should list all relevant public-facing changes,
> > including
> > >> >>>>> metadata like the config's "Importance". Personally
> > >> >>>>> I would recommend Medium, or even High if we're really worried
> > about
> > >> >>> the
> > >> >>>>> default being wrong for a lot of users
> > >> >>>>>
> > >> >>>>> Thanks for the KIP, besides those few things that Guozhang
> brought
> > >> up
> > >> >>> and
> > >> >>>>> the config importance, everything SGTM
> > >> >>>>>
> > >> >>>>> -Sophie
> > >> >>>>>
> > >> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <
> wangguoz@gmail.com
> > >
> > >> >>>> wrote:
> > >> >>>>>
> > >> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the
> > >> >>>> configured
> > >> >>>>>> bytes among threads evenly.
> > >> >>>>>>
> > >> >>>>>> 2) I was actually thinking about making the default a large
> > enough
> > >> >>>> value
> > >> >>>>> so
> > >> >>>>>> that we would not introduce performance regression: thinking
> > about
> > >> >> a
> > >> >>>> use
> > >> >>>>>> case with many partitions and each record may be large, then
> > >> >>>> effectively
> > >> >>>>> we
> > >> >>>>>> would only start pausing when the total bytes buffered is
> pretty
> > >> >>> large.
> > >> >>>>> If
> > >> >>>>>> we set the default value to small, we would be "more
> aggressive"
> > on
> > >> >>>>> pausing
> > >> >>>>>> which may impact throughput.
> > >> >>>>>>
> > >> >>>>>> 3) Yes exactly, this would naturally be at the
> "partition-group"
> > >> >>> class
> > >> >>>>>> since that represents the task's all input partitions.
> > >> >>>>>>
> > >> >>>>>> 4) This is just a bold thought, I'm interested to see other's
> > >> >>> thoughts.
> > >> >>>>>>
> > >> >>>>>>
> > >> >>>>>> Guozhang
> > >> >>>>>>
> > >> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <
> sagarmeansocean@gmail.com
> > >
> > >> >>>> wrote:
> > >> >>>>>>
> > >> >>>>>>> Thanks Guozhang.
> > >> >>>>>>>
> > >> >>>>>>> 1) Just for my confirmation, when you say we should proceed
> with
> > >> >>> the
> > >> >>>>> even
> > >> >>>>>>> distribution of bytes, are you referring to the Proposed
> > Solution
> > >> >>> in
> > >> >>>>> the
> > >> >>>>>>> KIP or the option you had considered in the JIRA?
> > >> >>>>>>> 2) Default value for the config is something that I missed. I
> > >> >> agree
> > >> >>>> we
> > >> >>>>>>> can't have really large values as it might be detrimental to
> the
> > >> >>>>>>> performance. Maybe, as a starting point, we assume that only 1
> > >> >>> Stream
> > >> >>>>>> Task
> > >> >>>>>>> is running so what could be the ideal value in such a
> scenario?
> > >> >>>>> Somewhere
> > >> >>>>>>> around 10MB similar to the caching config?
> > >> >>>>>>> 3) When you say,  *a task level metric indicating the current
> > >> >>> totally
> > >> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
> > >> >>> level?
> > >> >>>>>>> 4) I am ok with the name change, but would like to know
> others'
> > >> >>>>> thoughts.
> > >> >>>>>>>
> > >> >>>>>>> Thanks!
> > >> >>>>>>> Sagar.
> > >> >>>>>>>
> > >> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
> > >> >> wangguoz@gmail.com
> > >> >>>>
> > >> >>>>>> wrote:
> > >> >>>>>>>
> > >> >>>>>>>> Thanks Sagar for writing this PR.
> > >> >>>>>>>>
> > >> >>>>>>>> I think twice about the options that have been proposed in
> > >> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
> > >> >> that
> > >> >>>> at
> > >> >>>>>> the
> > >> >>>>>>>> moment it's simpler to just do the even distribution of the
> > >> >>>>> configured
> > >> >>>>>>>> total bytes. My rationale is that right now we have a static
> > >> >>> tasks
> > >> >>>> ->
> > >> >>>>>>>> threads mapping, and hence each partition would only be
> fetched
> > >> >>> by
> > >> >>>> a
> > >> >>>>>>> single
> > >> >>>>>>>> thread / consumer at a given time. If in the future we break
> > >> >> that
> > >> >>>>>> static
> > >> >>>>>>>> mapping into dynamic mapping, then we would not be able to do
> > >> >>> this
> > >> >>>>> even
> > >> >>>>>>>> distribution. Instead we would have other threads polling
> from
> > >> >>>>> consumer
> > >> >>>>>>>> only, and those threads would be responsible for checking the
> > >> >>>> config
> > >> >>>>>> and
> > >> >>>>>>>> pause non-empty partitions if it goes beyond the threshold.
> But
> > >> >>>> since
> > >> >>>>>> at
> > >> >>>>>>>> that time we would not change the config but just how it
> would
> > >> >> be
> > >> >>>>>>>> implemented behind the scenes we would not need another KIP
> to
> > >> >>>> change
> > >> >>>>>> it.
> > >> >>>>>>>>
> > >> >>>>>>>> Some more comments:
> > >> >>>>>>>>
> > >> >>>>>>>> 1. We need to discuss a bit about the default value of this
> new
> > >> >>>>> config.
> > >> >>>>>>>> Personally I think we need to be a bit conservative with
> large
> > >> >>>> values
> > >> >>>>>> so
> > >> >>>>>>>> that it would not have any perf regression compared with old
> > >> >>>> configs
> > >> >>>>>>>> especially with large topology and large number of
> partitions.
> > >> >>>>>>>> 2. I looked at the existing metrics, and do not have
> > >> >>> corresponding
> > >> >>>>>>> sensors.
> > >> >>>>>>>> How about also adding a task level metric indicating the
> > >> >> current
> > >> >>>>>> totally
> > >> >>>>>>>> aggregated metrics. The reason I do not suggest this metric
> on
> > >> >>> the
> > >> >>>>>>>> per-thread level is that in the future we may break the
> static
> > >> >>>>> mapping
> > >> >>>>>> of
> > >> >>>>>>>> tasks -> threads.
> > >> >>>>>>>>
> > >> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we
> can
> > >> >>>> rename
> > >> >>>>>> the
> > >> >>>>>>>> other "*cache.max.bytes.buffering*" as
> > >> >>> "statestore.cache.max.bytes"
> > >> >>>>>> (via
> > >> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like
> to
> > >> >>>> hear
> > >> >>>>>>>> others' thoughts.
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>> Guozhang
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
> > >> >> sagarmeansocean@gmail.com
> > >> >>>>
> > >> >>>>>> wrote:
> > >> >>>>>>>>
> > >> >>>>>>>>> Hi All,
> > >> >>>>>>>>>
> > >> >>>>>>>>> I would like to start a discussion on the following KIP:
> > >> >>>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>
> > >> >>>>>>
> > >> >>>>>
> > >> >>>>
> > >> >>>
> > >> >>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> > >> >>>>>>>>>
> > >> >>>>>>>>> Thanks!
> > >> >>>>>>>>> Sagar.
> > >> >>>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>> --
> > >> >>>>>>>> -- Guozhang
> > >> >>>>>>>>
> > >> >>>>>>>
> > >> >>>>>>
> > >> >>>>>>
> > >> >>>>>> --
> > >> >>>>>> -- Guozhang
> > >> >>>>>>
> > >> >>>>>
> > >> >>>>
> > >> >>>
> > >> >>>
> > >> >>> --
> > >> >>> -- Guozhang
> > >> >>>
> > >> >>
> > >> >
> > >>
> > >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Sagar,

Thanks for the added metrics, about its name, if it is proposed as a
task-level config, then we do not need to prefix its name as `task-`. But
on the other hand, it's better to give the full description of the metrics,
like its type name / tag maps / recording levels etc, an example is here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB

Guozhang

On Mon, Sep 20, 2021 at 10:04 AM Sagar <sa...@gmail.com> wrote:

> Hi All,
>
> Bumping this thread again.
>
> Thanks!
> Sagar.
>
> On Sat, Sep 11, 2021 at 2:04 PM Sagar <sa...@gmail.com> wrote:
>
> > Hi Mathias,
> >
> > I missed out on the metrics part.
> >
> > I have added the new metric in the proposed changes section along with
> the
> > small re-wording that you talked about.
> >
> > Let me know if that makes sense.
> >
> > Thanks!
> > Sagar.
> >
> > On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Thanks for the KIP.
> >>
> >> There was some discussion about adding a metric on the thread, but the
> >> KIP does not contain anything about it. Did we drop this suggestion or
> >> was the KIP not updated accordingly?
> >>
> >>
> >> Nit:
> >>
> >> > This would be a global config applicable per processing topology
> >>
> >> Can we change this to `per Kafka Streams instance.`
> >>
> >> Atm, a Stream instance executes a single topology, so it does not make
> >> any effective difference right now. However, it seems better (more
> >> logical) to bind the config to the instance (not the topology the
> >> instance executes).
> >>
> >>
> >> -Matthias
> >>
> >> On 9/2/21 6:08 AM, Sagar wrote:
> >> > Thanks Guozhang and Luke.
> >> >
> >> > I have updated the KIP with all the suggested changes.
> >> >
> >> > Do you think we could start voting for this?
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:
> >> >
> >> >> Thanks for the KIP. Overall LGTM.
> >> >>
> >> >> Just one thought, if we "rename" the config directly as mentioned in
> >> the
> >> >> KIP, would that break existing applications?
> >> >> Should we deprecate the old one first, and make the old/new names
> >> co-exist
> >> >> for some period of time?
> >> >>
> >> >> Public Interfaces
> >> >>
> >> >>    - Adding a new config *input.buffer.max.bytes *applicable at a
> >> topology
> >> >>    level. The importance of this config would be *Medium*.
> >> >>    - Renaming *cache.max.bytes.buffering* to
> >> *statestore.cache.max.bytes*.
> >> >>
> >> >>
> >> >>
> >> >> Thank you.
> >> >> Luke
> >> >>
> >> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Currently the state store cache size default value is 10MB today,
> >> which
> >> >>> arguably is rather small. So I'm thinking maybe for this config
> >> default
> >> >> to
> >> >>> 512MB.
> >> >>>
> >> >>> Other than that, LGTM.
> >> >>>
> >> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com>
> >> >> wrote:
> >> >>>
> >> >>>> Thanks Guozhang and Sophie.
> >> >>>>
> >> >>>> Yeah a small default value would lower the throughput. I didn't
> quite
> >> >>>> realise it earlier. It's slightly hard to predict this value so I
> >> would
> >> >>>> guess around 1/2 GB to 1 GB? WDYT?
> >> >>>>
> >> >>>> Regarding the renaming of the config and the new metric, sure would
> >> >>> include
> >> >>>> it in the KIP.
> >> >>>>
> >> >>>> Lastly, importance would also. be added. I guess Medium should be
> ok.
> >> >>>>
> >> >>>> Thanks!
> >> >>>> Sagar.
> >> >>>>
> >> >>>>
> >> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> >> >>>> <so...@confluent.io.invalid> wrote:
> >> >>>>
> >> >>>>> 1) I agree that we should just distribute the bytes evenly, at
> least
> >> >>> for
> >> >>>>> now. It's simpler to understand and
> >> >>>>> we can always change it later, plus it makes sense to keep this
> >> >> aligned
> >> >>>>> with how the cache works today
> >> >>>>>
> >> >>>>> 2) +1 to being conservative in the generous sense, it's just not
> >> >>>> something
> >> >>>>> we can predict with any degree
> >> >>>>> of accuracy and even if we could, the appropriate value is going
> to
> >> >>>> differ
> >> >>>>> wildly across applications and use
> >> >>>>> cases. We might want to just pick some multiple of the default
> cache
> >> >>>> size,
> >> >>>>> and maybe do some research on
> >> >>>>> other relevant defaults or sizes (default JVM heap, size of
> >> available
> >> >>>>> memory in common hosts eg EC2
> >> >>>>> instances, etc). We don't need to worry as much about erring on
> the
> >> >>> side
> >> >>>> of
> >> >>>>> too big, since other configs like
> >> >>>>> the max.poll.records will help somewhat to keep it from exploding.
> >> >>>>>
> >> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config
> name
> >> >> to
> >> >>> be
> >> >>>>> incredibly confusing. Deprecating this in
> >> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new
> >> >>> input
> >> >>>>> buffer config sounds good to me to include here.
> >> >>>>>
> >> >>>>> 5) The KIP should list all relevant public-facing changes,
> including
> >> >>>>> metadata like the config's "Importance". Personally
> >> >>>>> I would recommend Medium, or even High if we're really worried
> about
> >> >>> the
> >> >>>>> default being wrong for a lot of users
> >> >>>>>
> >> >>>>> Thanks for the KIP, besides those few things that Guozhang brought
> >> up
> >> >>> and
> >> >>>>> the config importance, everything SGTM
> >> >>>>>
> >> >>>>> -Sophie
> >> >>>>>
> >> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangguoz@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>
> >> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the
> >> >>>> configured
> >> >>>>>> bytes among threads evenly.
> >> >>>>>>
> >> >>>>>> 2) I was actually thinking about making the default a large
> enough
> >> >>>> value
> >> >>>>> so
> >> >>>>>> that we would not introduce performance regression: thinking
> about
> >> >> a
> >> >>>> use
> >> >>>>>> case with many partitions and each record may be large, then
> >> >>>> effectively
> >> >>>>> we
> >> >>>>>> would only start pausing when the total bytes buffered is pretty
> >> >>> large.
> >> >>>>> If
> >> >>>>>> we set the default value to small, we would be "more aggressive"
> on
> >> >>>>> pausing
> >> >>>>>> which may impact throughput.
> >> >>>>>>
> >> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group"
> >> >>> class
> >> >>>>>> since that represents the task's all input partitions.
> >> >>>>>>
> >> >>>>>> 4) This is just a bold thought, I'm interested to see other's
> >> >>> thoughts.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> Guozhang
> >> >>>>>>
> >> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansocean@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>>
> >> >>>>>>> Thanks Guozhang.
> >> >>>>>>>
> >> >>>>>>> 1) Just for my confirmation, when you say we should proceed with
> >> >>> the
> >> >>>>> even
> >> >>>>>>> distribution of bytes, are you referring to the Proposed
> Solution
> >> >>> in
> >> >>>>> the
> >> >>>>>>> KIP or the option you had considered in the JIRA?
> >> >>>>>>> 2) Default value for the config is something that I missed. I
> >> >> agree
> >> >>>> we
> >> >>>>>>> can't have really large values as it might be detrimental to the
> >> >>>>>>> performance. Maybe, as a starting point, we assume that only 1
> >> >>> Stream
> >> >>>>>> Task
> >> >>>>>>> is running so what could be the ideal value in such a scenario?
> >> >>>>> Somewhere
> >> >>>>>>> around 10MB similar to the caching config?
> >> >>>>>>> 3) When you say,  *a task level metric indicating the current
> >> >>> totally
> >> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
> >> >>> level?
> >> >>>>>>> 4) I am ok with the name change, but would like to know others'
> >> >>>>> thoughts.
> >> >>>>>>>
> >> >>>>>>> Thanks!
> >> >>>>>>> Sagar.
> >> >>>>>>>
> >> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
> >> >> wangguoz@gmail.com
> >> >>>>
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Thanks Sagar for writing this PR.
> >> >>>>>>>>
> >> >>>>>>>> I think twice about the options that have been proposed in
> >> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
> >> >> that
> >> >>>> at
> >> >>>>>> the
> >> >>>>>>>> moment it's simpler to just do the even distribution of the
> >> >>>>> configured
> >> >>>>>>>> total bytes. My rationale is that right now we have a static
> >> >>> tasks
> >> >>>> ->
> >> >>>>>>>> threads mapping, and hence each partition would only be fetched
> >> >>> by
> >> >>>> a
> >> >>>>>>> single
> >> >>>>>>>> thread / consumer at a given time. If in the future we break
> >> >> that
> >> >>>>>> static
> >> >>>>>>>> mapping into dynamic mapping, then we would not be able to do
> >> >>> this
> >> >>>>> even
> >> >>>>>>>> distribution. Instead we would have other threads polling from
> >> >>>>> consumer
> >> >>>>>>>> only, and those threads would be responsible for checking the
> >> >>>> config
> >> >>>>>> and
> >> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But
> >> >>>> since
> >> >>>>>> at
> >> >>>>>>>> that time we would not change the config but just how it would
> >> >> be
> >> >>>>>>>> implemented behind the scenes we would not need another KIP to
> >> >>>> change
> >> >>>>>> it.
> >> >>>>>>>>
> >> >>>>>>>> Some more comments:
> >> >>>>>>>>
> >> >>>>>>>> 1. We need to discuss a bit about the default value of this new
> >> >>>>> config.
> >> >>>>>>>> Personally I think we need to be a bit conservative with large
> >> >>>> values
> >> >>>>>> so
> >> >>>>>>>> that it would not have any perf regression compared with old
> >> >>>> configs
> >> >>>>>>>> especially with large topology and large number of partitions.
> >> >>>>>>>> 2. I looked at the existing metrics, and do not have
> >> >>> corresponding
> >> >>>>>>> sensors.
> >> >>>>>>>> How about also adding a task level metric indicating the
> >> >> current
> >> >>>>>> totally
> >> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on
> >> >>> the
> >> >>>>>>>> per-thread level is that in the future we may break the static
> >> >>>>> mapping
> >> >>>>>> of
> >> >>>>>>>> tasks -> threads.
> >> >>>>>>>>
> >> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can
> >> >>>> rename
> >> >>>>>> the
> >> >>>>>>>> other "*cache.max.bytes.buffering*" as
> >> >>> "statestore.cache.max.bytes"
> >> >>>>>> (via
> >> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to
> >> >>>> hear
> >> >>>>>>>> others' thoughts.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Guozhang
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
> >> >> sagarmeansocean@gmail.com
> >> >>>>
> >> >>>>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> Hi All,
> >> >>>>>>>>>
> >> >>>>>>>>> I would like to start a discussion on the following KIP:
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks!
> >> >>>>>>>>> Sagar.
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> --
> >> >>>>>>>> -- Guozhang
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> --
> >> >>>>>> -- Guozhang
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -- Guozhang
> >> >>>
> >> >>
> >> >
> >>
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Sagar <sa...@gmail.com>.
Hi All,

Bumping this thread again.

Thanks!
Sagar.

On Sat, Sep 11, 2021 at 2:04 PM Sagar <sa...@gmail.com> wrote:

> Hi Mathias,
>
> I missed out on the metrics part.
>
> I have added the new metric in the proposed changes section along with the
> small re-wording that you talked about.
>
> Let me know if that makes sense.
>
> Thanks!
> Sagar.
>
> On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Thanks for the KIP.
>>
>> There was some discussion about adding a metric on the thread, but the
>> KIP does not contain anything about it. Did we drop this suggestion or
>> was the KIP not updated accordingly?
>>
>>
>> Nit:
>>
>> > This would be a global config applicable per processing topology
>>
>> Can we change this to `per Kafka Streams instance.`
>>
>> Atm, a Stream instance executes a single topology, so it does not make
>> any effective difference right now. However, it seems better (more
>> logical) to bind the config to the instance (not the topology the
>> instance executes).
>>
>>
>> -Matthias
>>
>> On 9/2/21 6:08 AM, Sagar wrote:
>> > Thanks Guozhang and Luke.
>> >
>> > I have updated the KIP with all the suggested changes.
>> >
>> > Do you think we could start voting for this?
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:
>> >
>> >> Thanks for the KIP. Overall LGTM.
>> >>
>> >> Just one thought, if we "rename" the config directly as mentioned in
>> the
>> >> KIP, would that break existing applications?
>> >> Should we deprecate the old one first, and make the old/new names
>> co-exist
>> >> for some period of time?
>> >>
>> >> Public Interfaces
>> >>
>> >>    - Adding a new config *input.buffer.max.bytes *applicable at a
>> topology
>> >>    level. The importance of this config would be *Medium*.
>> >>    - Renaming *cache.max.bytes.buffering* to
>> *statestore.cache.max.bytes*.
>> >>
>> >>
>> >>
>> >> Thank you.
>> >> Luke
>> >>
>> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >>
>> >>> Currently the state store cache size default value is 10MB today,
>> which
>> >>> arguably is rather small. So I'm thinking maybe for this config
>> default
>> >> to
>> >>> 512MB.
>> >>>
>> >>> Other than that, LGTM.
>> >>>
>> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> Thanks Guozhang and Sophie.
>> >>>>
>> >>>> Yeah a small default value would lower the throughput. I didn't quite
>> >>>> realise it earlier. It's slightly hard to predict this value so I
>> would
>> >>>> guess around 1/2 GB to 1 GB? WDYT?
>> >>>>
>> >>>> Regarding the renaming of the config and the new metric, sure would
>> >>> include
>> >>>> it in the KIP.
>> >>>>
>> >>>> Lastly, importance would also. be added. I guess Medium should be ok.
>> >>>>
>> >>>> Thanks!
>> >>>> Sagar.
>> >>>>
>> >>>>
>> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
>> >>>> <so...@confluent.io.invalid> wrote:
>> >>>>
>> >>>>> 1) I agree that we should just distribute the bytes evenly, at least
>> >>> for
>> >>>>> now. It's simpler to understand and
>> >>>>> we can always change it later, plus it makes sense to keep this
>> >> aligned
>> >>>>> with how the cache works today
>> >>>>>
>> >>>>> 2) +1 to being conservative in the generous sense, it's just not
>> >>>> something
>> >>>>> we can predict with any degree
>> >>>>> of accuracy and even if we could, the appropriate value is going to
>> >>>> differ
>> >>>>> wildly across applications and use
>> >>>>> cases. We might want to just pick some multiple of the default cache
>> >>>> size,
>> >>>>> and maybe do some research on
>> >>>>> other relevant defaults or sizes (default JVM heap, size of
>> available
>> >>>>> memory in common hosts eg EC2
>> >>>>> instances, etc). We don't need to worry as much about erring on the
>> >>> side
>> >>>> of
>> >>>>> too big, since other configs like
>> >>>>> the max.poll.records will help somewhat to keep it from exploding.
>> >>>>>
>> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name
>> >> to
>> >>> be
>> >>>>> incredibly confusing. Deprecating this in
>> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new
>> >>> input
>> >>>>> buffer config sounds good to me to include here.
>> >>>>>
>> >>>>> 5) The KIP should list all relevant public-facing changes, including
>> >>>>> metadata like the config's "Importance". Personally
>> >>>>> I would recommend Medium, or even High if we're really worried about
>> >>> the
>> >>>>> default being wrong for a lot of users
>> >>>>>
>> >>>>> Thanks for the KIP, besides those few things that Guozhang brought
>> up
>> >>> and
>> >>>>> the config importance, everything SGTM
>> >>>>>
>> >>>>> -Sophie
>> >>>>>
>> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wa...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the
>> >>>> configured
>> >>>>>> bytes among threads evenly.
>> >>>>>>
>> >>>>>> 2) I was actually thinking about making the default a large enough
>> >>>> value
>> >>>>> so
>> >>>>>> that we would not introduce performance regression: thinking about
>> >> a
>> >>>> use
>> >>>>>> case with many partitions and each record may be large, then
>> >>>> effectively
>> >>>>> we
>> >>>>>> would only start pausing when the total bytes buffered is pretty
>> >>> large.
>> >>>>> If
>> >>>>>> we set the default value to small, we would be "more aggressive" on
>> >>>>> pausing
>> >>>>>> which may impact throughput.
>> >>>>>>
>> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group"
>> >>> class
>> >>>>>> since that represents the task's all input partitions.
>> >>>>>>
>> >>>>>> 4) This is just a bold thought, I'm interested to see other's
>> >>> thoughts.
>> >>>>>>
>> >>>>>>
>> >>>>>> Guozhang
>> >>>>>>
>> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sa...@gmail.com>
>> >>>> wrote:
>> >>>>>>
>> >>>>>>> Thanks Guozhang.
>> >>>>>>>
>> >>>>>>> 1) Just for my confirmation, when you say we should proceed with
>> >>> the
>> >>>>> even
>> >>>>>>> distribution of bytes, are you referring to the Proposed Solution
>> >>> in
>> >>>>> the
>> >>>>>>> KIP or the option you had considered in the JIRA?
>> >>>>>>> 2) Default value for the config is something that I missed. I
>> >> agree
>> >>>> we
>> >>>>>>> can't have really large values as it might be detrimental to the
>> >>>>>>> performance. Maybe, as a starting point, we assume that only 1
>> >>> Stream
>> >>>>>> Task
>> >>>>>>> is running so what could be the ideal value in such a scenario?
>> >>>>> Somewhere
>> >>>>>>> around 10MB similar to the caching config?
>> >>>>>>> 3) When you say,  *a task level metric indicating the current
>> >>> totally
>> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
>> >>> level?
>> >>>>>>> 4) I am ok with the name change, but would like to know others'
>> >>>>> thoughts.
>> >>>>>>>
>> >>>>>>> Thanks!
>> >>>>>>> Sagar.
>> >>>>>>>
>> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
>> >> wangguoz@gmail.com
>> >>>>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks Sagar for writing this PR.
>> >>>>>>>>
>> >>>>>>>> I think twice about the options that have been proposed in
>> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
>> >> that
>> >>>> at
>> >>>>>> the
>> >>>>>>>> moment it's simpler to just do the even distribution of the
>> >>>>> configured
>> >>>>>>>> total bytes. My rationale is that right now we have a static
>> >>> tasks
>> >>>> ->
>> >>>>>>>> threads mapping, and hence each partition would only be fetched
>> >>> by
>> >>>> a
>> >>>>>>> single
>> >>>>>>>> thread / consumer at a given time. If in the future we break
>> >> that
>> >>>>>> static
>> >>>>>>>> mapping into dynamic mapping, then we would not be able to do
>> >>> this
>> >>>>> even
>> >>>>>>>> distribution. Instead we would have other threads polling from
>> >>>>> consumer
>> >>>>>>>> only, and those threads would be responsible for checking the
>> >>>> config
>> >>>>>> and
>> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But
>> >>>> since
>> >>>>>> at
>> >>>>>>>> that time we would not change the config but just how it would
>> >> be
>> >>>>>>>> implemented behind the scenes we would not need another KIP to
>> >>>> change
>> >>>>>> it.
>> >>>>>>>>
>> >>>>>>>> Some more comments:
>> >>>>>>>>
>> >>>>>>>> 1. We need to discuss a bit about the default value of this new
>> >>>>> config.
>> >>>>>>>> Personally I think we need to be a bit conservative with large
>> >>>> values
>> >>>>>> so
>> >>>>>>>> that it would not have any perf regression compared with old
>> >>>> configs
>> >>>>>>>> especially with large topology and large number of partitions.
>> >>>>>>>> 2. I looked at the existing metrics, and do not have
>> >>> corresponding
>> >>>>>>> sensors.
>> >>>>>>>> How about also adding a task level metric indicating the
>> >> current
>> >>>>>> totally
>> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on
>> >>> the
>> >>>>>>>> per-thread level is that in the future we may break the static
>> >>>>> mapping
>> >>>>>> of
>> >>>>>>>> tasks -> threads.
>> >>>>>>>>
>> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can
>> >>>> rename
>> >>>>>> the
>> >>>>>>>> other "*cache.max.bytes.buffering*" as
>> >>> "statestore.cache.max.bytes"
>> >>>>>> (via
>> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to
>> >>>> hear
>> >>>>>>>> others' thoughts.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
>> >> sagarmeansocean@gmail.com
>> >>>>
>> >>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi All,
>> >>>>>>>>>
>> >>>>>>>>> I would like to start a discussion on the following KIP:
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
>> >>>>>>>>>
>> >>>>>>>>> Thanks!
>> >>>>>>>>> Sagar.
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> -- Guozhang
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> -- Guozhang
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >>> --
>> >>> -- Guozhang
>> >>>
>> >>
>> >
>>
>

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Sagar <sa...@gmail.com>.
Hi Mathias,

I missed out on the metrics part.

I have added the new metric in the proposed changes section along with the
small re-wording that you talked about.

Let me know if that makes sense.

Thanks!
Sagar.

On Fri, Sep 10, 2021 at 3:45 AM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for the KIP.
>
> There was some discussion about adding a metric on the thread, but the
> KIP does not contain anything about it. Did we drop this suggestion or
> was the KIP not updated accordingly?
>
>
> Nit:
>
> > This would be a global config applicable per processing topology
>
> Can we change this to `per Kafka Streams instance.`
>
> Atm, a Stream instance executes a single topology, so it does not make
> any effective difference right now. However, it seems better (more
> logical) to bind the config to the instance (not the topology the
> instance executes).
>
>
> -Matthias
>
> On 9/2/21 6:08 AM, Sagar wrote:
> > Thanks Guozhang and Luke.
> >
> > I have updated the KIP with all the suggested changes.
> >
> > Do you think we could start voting for this?
> >
> > Thanks!
> > Sagar.
> >
> > On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:
> >
> >> Thanks for the KIP. Overall LGTM.
> >>
> >> Just one thought, if we "rename" the config directly as mentioned in the
> >> KIP, would that break existing applications?
> >> Should we deprecate the old one first, and make the old/new names
> co-exist
> >> for some period of time?
> >>
> >> Public Interfaces
> >>
> >>    - Adding a new config *input.buffer.max.bytes *applicable at a
> topology
> >>    level. The importance of this config would be *Medium*.
> >>    - Renaming *cache.max.bytes.buffering* to
> *statestore.cache.max.bytes*.
> >>
> >>
> >>
> >> Thank you.
> >> Luke
> >>
> >> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >>> Currently the state store cache size default value is 10MB today, which
> >>> arguably is rather small. So I'm thinking maybe for this config default
> >> to
> >>> 512MB.
> >>>
> >>> Other than that, LGTM.
> >>>
> >>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com>
> >> wrote:
> >>>
> >>>> Thanks Guozhang and Sophie.
> >>>>
> >>>> Yeah a small default value would lower the throughput. I didn't quite
> >>>> realise it earlier. It's slightly hard to predict this value so I
> would
> >>>> guess around 1/2 GB to 1 GB? WDYT?
> >>>>
> >>>> Regarding the renaming of the config and the new metric, sure would
> >>> include
> >>>> it in the KIP.
> >>>>
> >>>> Lastly, importance would also. be added. I guess Medium should be ok.
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>>
> >>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> >>>> <so...@confluent.io.invalid> wrote:
> >>>>
> >>>>> 1) I agree that we should just distribute the bytes evenly, at least
> >>> for
> >>>>> now. It's simpler to understand and
> >>>>> we can always change it later, plus it makes sense to keep this
> >> aligned
> >>>>> with how the cache works today
> >>>>>
> >>>>> 2) +1 to being conservative in the generous sense, it's just not
> >>>> something
> >>>>> we can predict with any degree
> >>>>> of accuracy and even if we could, the appropriate value is going to
> >>>> differ
> >>>>> wildly across applications and use
> >>>>> cases. We might want to just pick some multiple of the default cache
> >>>> size,
> >>>>> and maybe do some research on
> >>>>> other relevant defaults or sizes (default JVM heap, size of available
> >>>>> memory in common hosts eg EC2
> >>>>> instances, etc). We don't need to worry as much about erring on the
> >>> side
> >>>> of
> >>>>> too big, since other configs like
> >>>>> the max.poll.records will help somewhat to keep it from exploding.
> >>>>>
> >>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name
> >> to
> >>> be
> >>>>> incredibly confusing. Deprecating this in
> >>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new
> >>> input
> >>>>> buffer config sounds good to me to include here.
> >>>>>
> >>>>> 5) The KIP should list all relevant public-facing changes, including
> >>>>> metadata like the config's "Importance". Personally
> >>>>> I would recommend Medium, or even High if we're really worried about
> >>> the
> >>>>> default being wrong for a lot of users
> >>>>>
> >>>>> Thanks for the KIP, besides those few things that Guozhang brought up
> >>> and
> >>>>> the config importance, everything SGTM
> >>>>>
> >>>>> -Sophie
> >>>>>
> >>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> 1) I meant for your proposed solution. I.e. to distribute the
> >>>> configured
> >>>>>> bytes among threads evenly.
> >>>>>>
> >>>>>> 2) I was actually thinking about making the default a large enough
> >>>> value
> >>>>> so
> >>>>>> that we would not introduce performance regression: thinking about
> >> a
> >>>> use
> >>>>>> case with many partitions and each record may be large, then
> >>>> effectively
> >>>>> we
> >>>>>> would only start pausing when the total bytes buffered is pretty
> >>> large.
> >>>>> If
> >>>>>> we set the default value to small, we would be "more aggressive" on
> >>>>> pausing
> >>>>>> which may impact throughput.
> >>>>>>
> >>>>>> 3) Yes exactly, this would naturally be at the "partition-group"
> >>> class
> >>>>>> since that represents the task's all input partitions.
> >>>>>>
> >>>>>> 4) This is just a bold thought, I'm interested to see other's
> >>> thoughts.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sa...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Thanks Guozhang.
> >>>>>>>
> >>>>>>> 1) Just for my confirmation, when you say we should proceed with
> >>> the
> >>>>> even
> >>>>>>> distribution of bytes, are you referring to the Proposed Solution
> >>> in
> >>>>> the
> >>>>>>> KIP or the option you had considered in the JIRA?
> >>>>>>> 2) Default value for the config is something that I missed. I
> >> agree
> >>>> we
> >>>>>>> can't have really large values as it might be detrimental to the
> >>>>>>> performance. Maybe, as a starting point, we assume that only 1
> >>> Stream
> >>>>>> Task
> >>>>>>> is running so what could be the ideal value in such a scenario?
> >>>>> Somewhere
> >>>>>>> around 10MB similar to the caching config?
> >>>>>>> 3) When you say,  *a task level metric indicating the current
> >>> totally
> >>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
> >>> level?
> >>>>>>> 4) I am ok with the name change, but would like to know others'
> >>>>> thoughts.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
> >> wangguoz@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks Sagar for writing this PR.
> >>>>>>>>
> >>>>>>>> I think twice about the options that have been proposed in
> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
> >> that
> >>>> at
> >>>>>> the
> >>>>>>>> moment it's simpler to just do the even distribution of the
> >>>>> configured
> >>>>>>>> total bytes. My rationale is that right now we have a static
> >>> tasks
> >>>> ->
> >>>>>>>> threads mapping, and hence each partition would only be fetched
> >>> by
> >>>> a
> >>>>>>> single
> >>>>>>>> thread / consumer at a given time. If in the future we break
> >> that
> >>>>>> static
> >>>>>>>> mapping into dynamic mapping, then we would not be able to do
> >>> this
> >>>>> even
> >>>>>>>> distribution. Instead we would have other threads polling from
> >>>>> consumer
> >>>>>>>> only, and those threads would be responsible for checking the
> >>>> config
> >>>>>> and
> >>>>>>>> pause non-empty partitions if it goes beyond the threshold. But
> >>>> since
> >>>>>> at
> >>>>>>>> that time we would not change the config but just how it would
> >> be
> >>>>>>>> implemented behind the scenes we would not need another KIP to
> >>>> change
> >>>>>> it.
> >>>>>>>>
> >>>>>>>> Some more comments:
> >>>>>>>>
> >>>>>>>> 1. We need to discuss a bit about the default value of this new
> >>>>> config.
> >>>>>>>> Personally I think we need to be a bit conservative with large
> >>>> values
> >>>>>> so
> >>>>>>>> that it would not have any perf regression compared with old
> >>>> configs
> >>>>>>>> especially with large topology and large number of partitions.
> >>>>>>>> 2. I looked at the existing metrics, and do not have
> >>> corresponding
> >>>>>>> sensors.
> >>>>>>>> How about also adding a task level metric indicating the
> >> current
> >>>>>> totally
> >>>>>>>> aggregated metrics. The reason I do not suggest this metric on
> >>> the
> >>>>>>>> per-thread level is that in the future we may break the static
> >>>>> mapping
> >>>>>> of
> >>>>>>>> tasks -> threads.
> >>>>>>>>
> >>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can
> >>>> rename
> >>>>>> the
> >>>>>>>> other "*cache.max.bytes.buffering*" as
> >>> "statestore.cache.max.bytes"
> >>>>>> (via
> >>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to
> >>>> hear
> >>>>>>>> others' thoughts.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
> >> sagarmeansocean@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi All,
> >>>>>>>>>
> >>>>>>>>> I would like to start a discussion on the following KIP:
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Sagar.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
>

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks for the KIP.

There was some discussion about adding a metric on the thread, but the
KIP does not contain anything about it. Did we drop this suggestion or
was the KIP not updated accordingly?


Nit:

> This would be a global config applicable per processing topology

Can we change this to `per Kafka Streams instance.`

Atm, a Stream instance executes a single topology, so it does not make
any effective difference right now. However, it seems better (more
logical) to bind the config to the instance (not the topology the
instance executes).


-Matthias

On 9/2/21 6:08 AM, Sagar wrote:
> Thanks Guozhang and Luke.
> 
> I have updated the KIP with all the suggested changes.
> 
> Do you think we could start voting for this?
> 
> Thanks!
> Sagar.
> 
> On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:
> 
>> Thanks for the KIP. Overall LGTM.
>>
>> Just one thought, if we "rename" the config directly as mentioned in the
>> KIP, would that break existing applications?
>> Should we deprecate the old one first, and make the old/new names co-exist
>> for some period of time?
>>
>> Public Interfaces
>>
>>    - Adding a new config *input.buffer.max.bytes *applicable at a topology
>>    level. The importance of this config would be *Medium*.
>>    - Renaming *cache.max.bytes.buffering* to *statestore.cache.max.bytes*.
>>
>>
>>
>> Thank you.
>> Luke
>>
>> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Currently the state store cache size default value is 10MB today, which
>>> arguably is rather small. So I'm thinking maybe for this config default
>> to
>>> 512MB.
>>>
>>> Other than that, LGTM.
>>>
>>> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com>
>> wrote:
>>>
>>>> Thanks Guozhang and Sophie.
>>>>
>>>> Yeah a small default value would lower the throughput. I didn't quite
>>>> realise it earlier. It's slightly hard to predict this value so I would
>>>> guess around 1/2 GB to 1 GB? WDYT?
>>>>
>>>> Regarding the renaming of the config and the new metric, sure would
>>> include
>>>> it in the KIP.
>>>>
>>>> Lastly, importance would also. be added. I guess Medium should be ok.
>>>>
>>>> Thanks!
>>>> Sagar.
>>>>
>>>>
>>>> On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
>>>> <so...@confluent.io.invalid> wrote:
>>>>
>>>>> 1) I agree that we should just distribute the bytes evenly, at least
>>> for
>>>>> now. It's simpler to understand and
>>>>> we can always change it later, plus it makes sense to keep this
>> aligned
>>>>> with how the cache works today
>>>>>
>>>>> 2) +1 to being conservative in the generous sense, it's just not
>>>> something
>>>>> we can predict with any degree
>>>>> of accuracy and even if we could, the appropriate value is going to
>>>> differ
>>>>> wildly across applications and use
>>>>> cases. We might want to just pick some multiple of the default cache
>>>> size,
>>>>> and maybe do some research on
>>>>> other relevant defaults or sizes (default JVM heap, size of available
>>>>> memory in common hosts eg EC2
>>>>> instances, etc). We don't need to worry as much about erring on the
>>> side
>>>> of
>>>>> too big, since other configs like
>>>>> the max.poll.records will help somewhat to keep it from exploding.
>>>>>
>>>>> 4) 100%, I always found the *cache.max.bytes.buffering* config name
>> to
>>> be
>>>>> incredibly confusing. Deprecating this in
>>>>> favor of "*statestore.cache.max.bytes*" and aligning it to the new
>>> input
>>>>> buffer config sounds good to me to include here.
>>>>>
>>>>> 5) The KIP should list all relevant public-facing changes, including
>>>>> metadata like the config's "Importance". Personally
>>>>> I would recommend Medium, or even High if we're really worried about
>>> the
>>>>> default being wrong for a lot of users
>>>>>
>>>>> Thanks for the KIP, besides those few things that Guozhang brought up
>>> and
>>>>> the config importance, everything SGTM
>>>>>
>>>>> -Sophie
>>>>>
>>>>> On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> 1) I meant for your proposed solution. I.e. to distribute the
>>>> configured
>>>>>> bytes among threads evenly.
>>>>>>
>>>>>> 2) I was actually thinking about making the default a large enough
>>>> value
>>>>> so
>>>>>> that we would not introduce performance regression: thinking about
>> a
>>>> use
>>>>>> case with many partitions and each record may be large, then
>>>> effectively
>>>>> we
>>>>>> would only start pausing when the total bytes buffered is pretty
>>> large.
>>>>> If
>>>>>> we set the default value to small, we would be "more aggressive" on
>>>>> pausing
>>>>>> which may impact throughput.
>>>>>>
>>>>>> 3) Yes exactly, this would naturally be at the "partition-group"
>>> class
>>>>>> since that represents the task's all input partitions.
>>>>>>
>>>>>> 4) This is just a bold thought, I'm interested to see other's
>>> thoughts.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Mon, Aug 23, 2021 at 4:10 AM Sagar <sa...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Thanks Guozhang.
>>>>>>>
>>>>>>> 1) Just for my confirmation, when you say we should proceed with
>>> the
>>>>> even
>>>>>>> distribution of bytes, are you referring to the Proposed Solution
>>> in
>>>>> the
>>>>>>> KIP or the option you had considered in the JIRA?
>>>>>>> 2) Default value for the config is something that I missed. I
>> agree
>>>> we
>>>>>>> can't have really large values as it might be detrimental to the
>>>>>>> performance. Maybe, as a starting point, we assume that only 1
>>> Stream
>>>>>> Task
>>>>>>> is running so what could be the ideal value in such a scenario?
>>>>> Somewhere
>>>>>>> around 10MB similar to the caching config?
>>>>>>> 3) When you say,  *a task level metric indicating the current
>>> totally
>>>>>>> aggregated metrics, * you mean the bytes aggregated at a task
>>> level?
>>>>>>> 4) I am ok with the name change, but would like to know others'
>>>>> thoughts.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Sagar.
>>>>>>>
>>>>>>> On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
>> wangguoz@gmail.com
>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Sagar for writing this PR.
>>>>>>>>
>>>>>>>> I think twice about the options that have been proposed in
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13152, and feel
>> that
>>>> at
>>>>>> the
>>>>>>>> moment it's simpler to just do the even distribution of the
>>>>> configured
>>>>>>>> total bytes. My rationale is that right now we have a static
>>> tasks
>>>> ->
>>>>>>>> threads mapping, and hence each partition would only be fetched
>>> by
>>>> a
>>>>>>> single
>>>>>>>> thread / consumer at a given time. If in the future we break
>> that
>>>>>> static
>>>>>>>> mapping into dynamic mapping, then we would not be able to do
>>> this
>>>>> even
>>>>>>>> distribution. Instead we would have other threads polling from
>>>>> consumer
>>>>>>>> only, and those threads would be responsible for checking the
>>>> config
>>>>>> and
>>>>>>>> pause non-empty partitions if it goes beyond the threshold. But
>>>> since
>>>>>> at
>>>>>>>> that time we would not change the config but just how it would
>> be
>>>>>>>> implemented behind the scenes we would not need another KIP to
>>>> change
>>>>>> it.
>>>>>>>>
>>>>>>>> Some more comments:
>>>>>>>>
>>>>>>>> 1. We need to discuss a bit about the default value of this new
>>>>> config.
>>>>>>>> Personally I think we need to be a bit conservative with large
>>>> values
>>>>>> so
>>>>>>>> that it would not have any perf regression compared with old
>>>> configs
>>>>>>>> especially with large topology and large number of partitions.
>>>>>>>> 2. I looked at the existing metrics, and do not have
>>> corresponding
>>>>>>> sensors.
>>>>>>>> How about also adding a task level metric indicating the
>> current
>>>>>> totally
>>>>>>>> aggregated metrics. The reason I do not suggest this metric on
>>> the
>>>>>>>> per-thread level is that in the future we may break the static
>>>>> mapping
>>>>>> of
>>>>>>>> tasks -> threads.
>>>>>>>>
>>>>>>>> [optional] As an orthogonal thought, I'm thinking maybe we can
>>>> rename
>>>>>> the
>>>>>>>> other "*cache.max.bytes.buffering*" as
>>> "statestore.cache.max.bytes"
>>>>>> (via
>>>>>>>> deprecation of course), piggy-backed in this KIP? Would like to
>>>> hear
>>>>>>>> others' thoughts.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Aug 22, 2021 at 9:29 AM Sagar <
>> sagarmeansocean@gmail.com
>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I would like to start a discussion on the following KIP:
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Sagar.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Oh you already did -- missed the VOTE thread somehow. Voted :)

On Tue, Sep 7, 2021 at 6:27 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Yeah, feel free to kick off the vote
>
> On Thu, Sep 2, 2021 at 6:08 AM Sagar <sa...@gmail.com> wrote:
>
>> Thanks Guozhang and Luke.
>>
>> I have updated the KIP with all the suggested changes.
>>
>> Do you think we could start voting for this?
>>
>> Thanks!
>> Sagar.
>>
>> On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:
>>
>> > Thanks for the KIP. Overall LGTM.
>> >
>> > Just one thought, if we "rename" the config directly as mentioned in the
>> > KIP, would that break existing applications?
>> > Should we deprecate the old one first, and make the old/new names
>> co-exist
>> > for some period of time?
>> >
>> > Public Interfaces
>> >
>> >    - Adding a new config *input.buffer.max.bytes *applicable at a
>> topology
>> >    level. The importance of this config would be *Medium*.
>> >    - Renaming *cache.max.bytes.buffering* to
>> *statestore.cache.max.bytes*.
>> >
>> >
>> >
>> > Thank you.
>> > Luke
>> >
>> > On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Currently the state store cache size default value is 10MB today,
>> which
>> > > arguably is rather small. So I'm thinking maybe for this config
>> default
>> > to
>> > > 512MB.
>> > >
>> > > Other than that, LGTM.
>> > >
>> > > On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com>
>> > wrote:
>> > >
>> > > > Thanks Guozhang and Sophie.
>> > > >
>> > > > Yeah a small default value would lower the throughput. I didn't
>> quite
>> > > > realise it earlier. It's slightly hard to predict this value so I
>> would
>> > > > guess around 1/2 GB to 1 GB? WDYT?
>> > > >
>> > > > Regarding the renaming of the config and the new metric, sure would
>> > > include
>> > > > it in the KIP.
>> > > >
>> > > > Lastly, importance would also. be added. I guess Medium should be
>> ok.
>> > > >
>> > > > Thanks!
>> > > > Sagar.
>> > > >
>> > > >
>> > > > On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
>> > > > <so...@confluent.io.invalid> wrote:
>> > > >
>> > > > > 1) I agree that we should just distribute the bytes evenly, at
>> least
>> > > for
>> > > > > now. It's simpler to understand and
>> > > > > we can always change it later, plus it makes sense to keep this
>> > aligned
>> > > > > with how the cache works today
>> > > > >
>> > > > > 2) +1 to being conservative in the generous sense, it's just not
>> > > > something
>> > > > > we can predict with any degree
>> > > > > of accuracy and even if we could, the appropriate value is going
>> to
>> > > > differ
>> > > > > wildly across applications and use
>> > > > > cases. We might want to just pick some multiple of the default
>> cache
>> > > > size,
>> > > > > and maybe do some research on
>> > > > > other relevant defaults or sizes (default JVM heap, size of
>> available
>> > > > > memory in common hosts eg EC2
>> > > > > instances, etc). We don't need to worry as much about erring on
>> the
>> > > side
>> > > > of
>> > > > > too big, since other configs like
>> > > > > the max.poll.records will help somewhat to keep it from exploding.
>> > > > >
>> > > > > 4) 100%, I always found the *cache.max.bytes.buffering* config
>> name
>> > to
>> > > be
>> > > > > incredibly confusing. Deprecating this in
>> > > > > favor of "*statestore.cache.max.bytes*" and aligning it to the new
>> > > input
>> > > > > buffer config sounds good to me to include here.
>> > > > >
>> > > > > 5) The KIP should list all relevant public-facing changes,
>> including
>> > > > > metadata like the config's "Importance". Personally
>> > > > > I would recommend Medium, or even High if we're really worried
>> about
>> > > the
>> > > > > default being wrong for a lot of users
>> > > > >
>> > > > > Thanks for the KIP, besides those few things that Guozhang
>> brought up
>> > > and
>> > > > > the config importance, everything SGTM
>> > > > >
>> > > > > -Sophie
>> > > > >
>> > > > > On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wangguoz@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > 1) I meant for your proposed solution. I.e. to distribute the
>> > > > configured
>> > > > > > bytes among threads evenly.
>> > > > > >
>> > > > > > 2) I was actually thinking about making the default a large
>> enough
>> > > > value
>> > > > > so
>> > > > > > that we would not introduce performance regression: thinking
>> about
>> > a
>> > > > use
>> > > > > > case with many partitions and each record may be large, then
>> > > > effectively
>> > > > > we
>> > > > > > would only start pausing when the total bytes buffered is pretty
>> > > large.
>> > > > > If
>> > > > > > we set the default value to small, we would be "more
>> aggressive" on
>> > > > > pausing
>> > > > > > which may impact throughput.
>> > > > > >
>> > > > > > 3) Yes exactly, this would naturally be at the "partition-group"
>> > > class
>> > > > > > since that represents the task's all input partitions.
>> > > > > >
>> > > > > > 4) This is just a bold thought, I'm interested to see other's
>> > > thoughts.
>> > > > > >
>> > > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > > On Mon, Aug 23, 2021 at 4:10 AM Sagar <
>> sagarmeansocean@gmail.com>
>> > > > wrote:
>> > > > > >
>> > > > > > > Thanks Guozhang.
>> > > > > > >
>> > > > > > > 1) Just for my confirmation, when you say we should proceed
>> with
>> > > the
>> > > > > even
>> > > > > > > distribution of bytes, are you referring to the Proposed
>> Solution
>> > > in
>> > > > > the
>> > > > > > > KIP or the option you had considered in the JIRA?
>> > > > > > > 2) Default value for the config is something that I missed. I
>> > agree
>> > > > we
>> > > > > > > can't have really large values as it might be detrimental to
>> the
>> > > > > > > performance. Maybe, as a starting point, we assume that only 1
>> > > Stream
>> > > > > > Task
>> > > > > > > is running so what could be the ideal value in such a
>> scenario?
>> > > > > Somewhere
>> > > > > > > around 10MB similar to the caching config?
>> > > > > > > 3) When you say,  *a task level metric indicating the current
>> > > totally
>> > > > > > > aggregated metrics, * you mean the bytes aggregated at a task
>> > > level?
>> > > > > > > 4) I am ok with the name change, but would like to know
>> others'
>> > > > > thoughts.
>> > > > > > >
>> > > > > > > Thanks!
>> > > > > > > Sagar.
>> > > > > > >
>> > > > > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
>> > wangguoz@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Thanks Sagar for writing this PR.
>> > > > > > > >
>> > > > > > > > I think twice about the options that have been proposed in
>> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel
>> > that
>> > > > at
>> > > > > > the
>> > > > > > > > moment it's simpler to just do the even distribution of the
>> > > > > configured
>> > > > > > > > total bytes. My rationale is that right now we have a static
>> > > tasks
>> > > > ->
>> > > > > > > > threads mapping, and hence each partition would only be
>> fetched
>> > > by
>> > > > a
>> > > > > > > single
>> > > > > > > > thread / consumer at a given time. If in the future we break
>> > that
>> > > > > > static
>> > > > > > > > mapping into dynamic mapping, then we would not be able to
>> do
>> > > this
>> > > > > even
>> > > > > > > > distribution. Instead we would have other threads polling
>> from
>> > > > > consumer
>> > > > > > > > only, and those threads would be responsible for checking
>> the
>> > > > config
>> > > > > > and
>> > > > > > > > pause non-empty partitions if it goes beyond the threshold.
>> But
>> > > > since
>> > > > > > at
>> > > > > > > > that time we would not change the config but just how it
>> would
>> > be
>> > > > > > > > implemented behind the scenes we would not need another KIP
>> to
>> > > > change
>> > > > > > it.
>> > > > > > > >
>> > > > > > > > Some more comments:
>> > > > > > > >
>> > > > > > > > 1. We need to discuss a bit about the default value of this
>> new
>> > > > > config.
>> > > > > > > > Personally I think we need to be a bit conservative with
>> large
>> > > > values
>> > > > > > so
>> > > > > > > > that it would not have any perf regression compared with old
>> > > > configs
>> > > > > > > > especially with large topology and large number of
>> partitions.
>> > > > > > > > 2. I looked at the existing metrics, and do not have
>> > > corresponding
>> > > > > > > sensors.
>> > > > > > > > How about also adding a task level metric indicating the
>> > current
>> > > > > > totally
>> > > > > > > > aggregated metrics. The reason I do not suggest this metric
>> on
>> > > the
>> > > > > > > > per-thread level is that in the future we may break the
>> static
>> > > > > mapping
>> > > > > > of
>> > > > > > > > tasks -> threads.
>> > > > > > > >
>> > > > > > > > [optional] As an orthogonal thought, I'm thinking maybe we
>> can
>> > > > rename
>> > > > > > the
>> > > > > > > > other "*cache.max.bytes.buffering*" as
>> > > "statestore.cache.max.bytes"
>> > > > > > (via
>> > > > > > > > deprecation of course), piggy-backed in this KIP? Would
>> like to
>> > > > hear
>> > > > > > > > others' thoughts.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Guozhang
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar <
>> > sagarmeansocean@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi All,
>> > > > > > > > >
>> > > > > > > > > I would like to start a discussion on the following KIP:
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
>> > > > > > > > >
>> > > > > > > > > Thanks!
>> > > > > > > > > Sagar.
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > --
>> > > > > > > > -- Guozhang
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Yeah, feel free to kick off the vote

On Thu, Sep 2, 2021 at 6:08 AM Sagar <sa...@gmail.com> wrote:

> Thanks Guozhang and Luke.
>
> I have updated the KIP with all the suggested changes.
>
> Do you think we could start voting for this?
>
> Thanks!
> Sagar.
>
> On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Thanks for the KIP. Overall LGTM.
> >
> > Just one thought, if we "rename" the config directly as mentioned in the
> > KIP, would that break existing applications?
> > Should we deprecate the old one first, and make the old/new names
> co-exist
> > for some period of time?
> >
> > Public Interfaces
> >
> >    - Adding a new config *input.buffer.max.bytes *applicable at a
> topology
> >    level. The importance of this config would be *Medium*.
> >    - Renaming *cache.max.bytes.buffering* to
> *statestore.cache.max.bytes*.
> >
> >
> >
> > Thank you.
> > Luke
> >
> > On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Currently the state store cache size default value is 10MB today, which
> > > arguably is rather small. So I'm thinking maybe for this config default
> > to
> > > 512MB.
> > >
> > > Other than that, LGTM.
> > >
> > > On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com>
> > wrote:
> > >
> > > > Thanks Guozhang and Sophie.
> > > >
> > > > Yeah a small default value would lower the throughput. I didn't quite
> > > > realise it earlier. It's slightly hard to predict this value so I
> would
> > > > guess around 1/2 GB to 1 GB? WDYT?
> > > >
> > > > Regarding the renaming of the config and the new metric, sure would
> > > include
> > > > it in the KIP.
> > > >
> > > > Lastly, importance would also. be added. I guess Medium should be ok.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > >
> > > > On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> > > > <so...@confluent.io.invalid> wrote:
> > > >
> > > > > 1) I agree that we should just distribute the bytes evenly, at
> least
> > > for
> > > > > now. It's simpler to understand and
> > > > > we can always change it later, plus it makes sense to keep this
> > aligned
> > > > > with how the cache works today
> > > > >
> > > > > 2) +1 to being conservative in the generous sense, it's just not
> > > > something
> > > > > we can predict with any degree
> > > > > of accuracy and even if we could, the appropriate value is going to
> > > > differ
> > > > > wildly across applications and use
> > > > > cases. We might want to just pick some multiple of the default
> cache
> > > > size,
> > > > > and maybe do some research on
> > > > > other relevant defaults or sizes (default JVM heap, size of
> available
> > > > > memory in common hosts eg EC2
> > > > > instances, etc). We don't need to worry as much about erring on the
> > > side
> > > > of
> > > > > too big, since other configs like
> > > > > the max.poll.records will help somewhat to keep it from exploding.
> > > > >
> > > > > 4) 100%, I always found the *cache.max.bytes.buffering* config name
> > to
> > > be
> > > > > incredibly confusing. Deprecating this in
> > > > > favor of "*statestore.cache.max.bytes*" and aligning it to the new
> > > input
> > > > > buffer config sounds good to me to include here.
> > > > >
> > > > > 5) The KIP should list all relevant public-facing changes,
> including
> > > > > metadata like the config's "Importance". Personally
> > > > > I would recommend Medium, or even High if we're really worried
> about
> > > the
> > > > > default being wrong for a lot of users
> > > > >
> > > > > Thanks for the KIP, besides those few things that Guozhang brought
> up
> > > and
> > > > > the config importance, everything SGTM
> > > > >
> > > > > -Sophie
> > > > >
> > > > > On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > 1) I meant for your proposed solution. I.e. to distribute the
> > > > configured
> > > > > > bytes among threads evenly.
> > > > > >
> > > > > > 2) I was actually thinking about making the default a large
> enough
> > > > value
> > > > > so
> > > > > > that we would not introduce performance regression: thinking
> about
> > a
> > > > use
> > > > > > case with many partitions and each record may be large, then
> > > > effectively
> > > > > we
> > > > > > would only start pausing when the total bytes buffered is pretty
> > > large.
> > > > > If
> > > > > > we set the default value to small, we would be "more aggressive"
> on
> > > > > pausing
> > > > > > which may impact throughput.
> > > > > >
> > > > > > 3) Yes exactly, this would naturally be at the "partition-group"
> > > class
> > > > > > since that represents the task's all input partitions.
> > > > > >
> > > > > > 4) This is just a bold thought, I'm interested to see other's
> > > thoughts.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Aug 23, 2021 at 4:10 AM Sagar <sagarmeansocean@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Thanks Guozhang.
> > > > > > >
> > > > > > > 1) Just for my confirmation, when you say we should proceed
> with
> > > the
> > > > > even
> > > > > > > distribution of bytes, are you referring to the Proposed
> Solution
> > > in
> > > > > the
> > > > > > > KIP or the option you had considered in the JIRA?
> > > > > > > 2) Default value for the config is something that I missed. I
> > agree
> > > > we
> > > > > > > can't have really large values as it might be detrimental to
> the
> > > > > > > performance. Maybe, as a starting point, we assume that only 1
> > > Stream
> > > > > > Task
> > > > > > > is running so what could be the ideal value in such a scenario?
> > > > > Somewhere
> > > > > > > around 10MB similar to the caching config?
> > > > > > > 3) When you say,  *a task level metric indicating the current
> > > totally
> > > > > > > aggregated metrics, * you mean the bytes aggregated at a task
> > > level?
> > > > > > > 4) I am ok with the name change, but would like to know others'
> > > > > thoughts.
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Sagar for writing this PR.
> > > > > > > >
> > > > > > > > I think twice about the options that have been proposed in
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel
> > that
> > > > at
> > > > > > the
> > > > > > > > moment it's simpler to just do the even distribution of the
> > > > > configured
> > > > > > > > total bytes. My rationale is that right now we have a static
> > > tasks
> > > > ->
> > > > > > > > threads mapping, and hence each partition would only be
> fetched
> > > by
> > > > a
> > > > > > > single
> > > > > > > > thread / consumer at a given time. If in the future we break
> > that
> > > > > > static
> > > > > > > > mapping into dynamic mapping, then we would not be able to do
> > > this
> > > > > even
> > > > > > > > distribution. Instead we would have other threads polling
> from
> > > > > consumer
> > > > > > > > only, and those threads would be responsible for checking the
> > > > config
> > > > > > and
> > > > > > > > pause non-empty partitions if it goes beyond the threshold.
> But
> > > > since
> > > > > > at
> > > > > > > > that time we would not change the config but just how it
> would
> > be
> > > > > > > > implemented behind the scenes we would not need another KIP
> to
> > > > change
> > > > > > it.
> > > > > > > >
> > > > > > > > Some more comments:
> > > > > > > >
> > > > > > > > 1. We need to discuss a bit about the default value of this
> new
> > > > > config.
> > > > > > > > Personally I think we need to be a bit conservative with
> large
> > > > values
> > > > > > so
> > > > > > > > that it would not have any perf regression compared with old
> > > > configs
> > > > > > > > especially with large topology and large number of
> partitions.
> > > > > > > > 2. I looked at the existing metrics, and do not have
> > > corresponding
> > > > > > > sensors.
> > > > > > > > How about also adding a task level metric indicating the
> > current
> > > > > > totally
> > > > > > > > aggregated metrics. The reason I do not suggest this metric
> on
> > > the
> > > > > > > > per-thread level is that in the future we may break the
> static
> > > > > mapping
> > > > > > of
> > > > > > > > tasks -> threads.
> > > > > > > >
> > > > > > > > [optional] As an orthogonal thought, I'm thinking maybe we
> can
> > > > rename
> > > > > > the
> > > > > > > > other "*cache.max.bytes.buffering*" as
> > > "statestore.cache.max.bytes"
> > > > > > (via
> > > > > > > > deprecation of course), piggy-backed in this KIP? Would like
> to
> > > > hear
> > > > > > > > others' thoughts.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar <
> > sagarmeansocean@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I would like to start a discussion on the following KIP:
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Sagar.
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Sagar <sa...@gmail.com>.
Thanks Guozhang and Luke.

I have updated the KIP with all the suggested changes.

Do you think we could start voting for this?

Thanks!
Sagar.

On Thu, Sep 2, 2021 at 8:26 AM Luke Chen <sh...@gmail.com> wrote:

> Thanks for the KIP. Overall LGTM.
>
> Just one thought, if we "rename" the config directly as mentioned in the
> KIP, would that break existing applications?
> Should we deprecate the old one first, and make the old/new names co-exist
> for some period of time?
>
> Public Interfaces
>
>    - Adding a new config *input.buffer.max.bytes *applicable at a topology
>    level. The importance of this config would be *Medium*.
>    - Renaming *cache.max.bytes.buffering* to *statestore.cache.max.bytes*.
>
>
>
> Thank you.
> Luke
>
> On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Currently the state store cache size default value is 10MB today, which
> > arguably is rather small. So I'm thinking maybe for this config default
> to
> > 512MB.
> >
> > Other than that, LGTM.
> >
> > On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com>
> wrote:
> >
> > > Thanks Guozhang and Sophie.
> > >
> > > Yeah a small default value would lower the throughput. I didn't quite
> > > realise it earlier. It's slightly hard to predict this value so I would
> > > guess around 1/2 GB to 1 GB? WDYT?
> > >
> > > Regarding the renaming of the config and the new metric, sure would
> > include
> > > it in the KIP.
> > >
> > > Lastly, importance would also. be added. I guess Medium should be ok.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> > > <so...@confluent.io.invalid> wrote:
> > >
> > > > 1) I agree that we should just distribute the bytes evenly, at least
> > for
> > > > now. It's simpler to understand and
> > > > we can always change it later, plus it makes sense to keep this
> aligned
> > > > with how the cache works today
> > > >
> > > > 2) +1 to being conservative in the generous sense, it's just not
> > > something
> > > > we can predict with any degree
> > > > of accuracy and even if we could, the appropriate value is going to
> > > differ
> > > > wildly across applications and use
> > > > cases. We might want to just pick some multiple of the default cache
> > > size,
> > > > and maybe do some research on
> > > > other relevant defaults or sizes (default JVM heap, size of available
> > > > memory in common hosts eg EC2
> > > > instances, etc). We don't need to worry as much about erring on the
> > side
> > > of
> > > > too big, since other configs like
> > > > the max.poll.records will help somewhat to keep it from exploding.
> > > >
> > > > 4) 100%, I always found the *cache.max.bytes.buffering* config name
> to
> > be
> > > > incredibly confusing. Deprecating this in
> > > > favor of "*statestore.cache.max.bytes*" and aligning it to the new
> > input
> > > > buffer config sounds good to me to include here.
> > > >
> > > > 5) The KIP should list all relevant public-facing changes, including
> > > > metadata like the config's "Importance". Personally
> > > > I would recommend Medium, or even High if we're really worried about
> > the
> > > > default being wrong for a lot of users
> > > >
> > > > Thanks for the KIP, besides those few things that Guozhang brought up
> > and
> > > > the config importance, everything SGTM
> > > >
> > > > -Sophie
> > > >
> > > > On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > 1) I meant for your proposed solution. I.e. to distribute the
> > > configured
> > > > > bytes among threads evenly.
> > > > >
> > > > > 2) I was actually thinking about making the default a large enough
> > > value
> > > > so
> > > > > that we would not introduce performance regression: thinking about
> a
> > > use
> > > > > case with many partitions and each record may be large, then
> > > effectively
> > > > we
> > > > > would only start pausing when the total bytes buffered is pretty
> > large.
> > > > If
> > > > > we set the default value to small, we would be "more aggressive" on
> > > > pausing
> > > > > which may impact throughput.
> > > > >
> > > > > 3) Yes exactly, this would naturally be at the "partition-group"
> > class
> > > > > since that represents the task's all input partitions.
> > > > >
> > > > > 4) This is just a bold thought, I'm interested to see other's
> > thoughts.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Aug 23, 2021 at 4:10 AM Sagar <sa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Guozhang.
> > > > > >
> > > > > > 1) Just for my confirmation, when you say we should proceed with
> > the
> > > > even
> > > > > > distribution of bytes, are you referring to the Proposed Solution
> > in
> > > > the
> > > > > > KIP or the option you had considered in the JIRA?
> > > > > > 2) Default value for the config is something that I missed. I
> agree
> > > we
> > > > > > can't have really large values as it might be detrimental to the
> > > > > > performance. Maybe, as a starting point, we assume that only 1
> > Stream
> > > > > Task
> > > > > > is running so what could be the ideal value in such a scenario?
> > > > Somewhere
> > > > > > around 10MB similar to the caching config?
> > > > > > 3) When you say,  *a task level metric indicating the current
> > totally
> > > > > > aggregated metrics, * you mean the bytes aggregated at a task
> > level?
> > > > > > 4) I am ok with the name change, but would like to know others'
> > > > thoughts.
> > > > > >
> > > > > > Thanks!
> > > > > > Sagar.
> > > > > >
> > > > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Thanks Sagar for writing this PR.
> > > > > > >
> > > > > > > I think twice about the options that have been proposed in
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel
> that
> > > at
> > > > > the
> > > > > > > moment it's simpler to just do the even distribution of the
> > > > configured
> > > > > > > total bytes. My rationale is that right now we have a static
> > tasks
> > > ->
> > > > > > > threads mapping, and hence each partition would only be fetched
> > by
> > > a
> > > > > > single
> > > > > > > thread / consumer at a given time. If in the future we break
> that
> > > > > static
> > > > > > > mapping into dynamic mapping, then we would not be able to do
> > this
> > > > even
> > > > > > > distribution. Instead we would have other threads polling from
> > > > consumer
> > > > > > > only, and those threads would be responsible for checking the
> > > config
> > > > > and
> > > > > > > pause non-empty partitions if it goes beyond the threshold. But
> > > since
> > > > > at
> > > > > > > that time we would not change the config but just how it would
> be
> > > > > > > implemented behind the scenes we would not need another KIP to
> > > change
> > > > > it.
> > > > > > >
> > > > > > > Some more comments:
> > > > > > >
> > > > > > > 1. We need to discuss a bit about the default value of this new
> > > > config.
> > > > > > > Personally I think we need to be a bit conservative with large
> > > values
> > > > > so
> > > > > > > that it would not have any perf regression compared with old
> > > configs
> > > > > > > especially with large topology and large number of partitions.
> > > > > > > 2. I looked at the existing metrics, and do not have
> > corresponding
> > > > > > sensors.
> > > > > > > How about also adding a task level metric indicating the
> current
> > > > > totally
> > > > > > > aggregated metrics. The reason I do not suggest this metric on
> > the
> > > > > > > per-thread level is that in the future we may break the static
> > > > mapping
> > > > > of
> > > > > > > tasks -> threads.
> > > > > > >
> > > > > > > [optional] As an orthogonal thought, I'm thinking maybe we can
> > > rename
> > > > > the
> > > > > > > other "*cache.max.bytes.buffering*" as
> > "statestore.cache.max.bytes"
> > > > > (via
> > > > > > > deprecation of course), piggy-backed in this KIP? Would like to
> > > hear
> > > > > > > others' thoughts.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar <
> sagarmeansocean@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I would like to start a discussion on the following KIP:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > Sagar.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-770: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

Posted by Luke Chen <sh...@gmail.com>.
Thanks for the KIP. Overall LGTM.

Just one thought, if we "rename" the config directly as mentioned in the
KIP, would that break existing applications?
Should we deprecate the old one first, and make the old/new names co-exist
for some period of time?

Public Interfaces

   - Adding a new config *input.buffer.max.bytes *applicable at a topology
   level. The importance of this config would be *Medium*.
   - Renaming *cache.max.bytes.buffering* to *statestore.cache.max.bytes*.



Thank you.
Luke

On Thu, Sep 2, 2021 at 1:50 AM Guozhang Wang <wa...@gmail.com> wrote:

> Currently the state store cache size default value is 10MB today, which
> arguably is rather small. So I'm thinking maybe for this config default to
> 512MB.
>
> Other than that, LGTM.
>
> On Sat, Aug 28, 2021 at 11:34 AM Sagar <sa...@gmail.com> wrote:
>
> > Thanks Guozhang and Sophie.
> >
> > Yeah a small default value would lower the throughput. I didn't quite
> > realise it earlier. It's slightly hard to predict this value so I would
> > guess around 1/2 GB to 1 GB? WDYT?
> >
> > Regarding the renaming of the config and the new metric, sure would
> include
> > it in the KIP.
> >
> > Lastly, importance would also. be added. I guess Medium should be ok.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Sat, Aug 28, 2021 at 10:42 AM Sophie Blee-Goldman
> > <so...@confluent.io.invalid> wrote:
> >
> > > 1) I agree that we should just distribute the bytes evenly, at least
> for
> > > now. It's simpler to understand and
> > > we can always change it later, plus it makes sense to keep this aligned
> > > with how the cache works today
> > >
> > > 2) +1 to being conservative in the generous sense, it's just not
> > something
> > > we can predict with any degree
> > > of accuracy and even if we could, the appropriate value is going to
> > differ
> > > wildly across applications and use
> > > cases. We might want to just pick some multiple of the default cache
> > size,
> > > and maybe do some research on
> > > other relevant defaults or sizes (default JVM heap, size of available
> > > memory in common hosts eg EC2
> > > instances, etc). We don't need to worry as much about erring on the
> side
> > of
> > > too big, since other configs like
> > > the max.poll.records will help somewhat to keep it from exploding.
> > >
> > > 4) 100%, I always found the *cache.max.bytes.buffering* config name to
> be
> > > incredibly confusing. Deprecating this in
> > > favor of "*statestore.cache.max.bytes*" and aligning it to the new
> input
> > > buffer config sounds good to me to include here.
> > >
> > > 5) The KIP should list all relevant public-facing changes, including
> > > metadata like the config's "Importance". Personally
> > > I would recommend Medium, or even High if we're really worried about
> the
> > > default being wrong for a lot of users
> > >
> > > Thanks for the KIP, besides those few things that Guozhang brought up
> and
> > > the config importance, everything SGTM
> > >
> > > -Sophie
> > >
> > > On Thu, Aug 26, 2021 at 2:41 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > 1) I meant for your proposed solution. I.e. to distribute the
> > configured
> > > > bytes among threads evenly.
> > > >
> > > > 2) I was actually thinking about making the default a large enough
> > value
> > > so
> > > > that we would not introduce performance regression: thinking about a
> > use
> > > > case with many partitions and each record may be large, then
> > effectively
> > > we
> > > > would only start pausing when the total bytes buffered is pretty
> large.
> > > If
> > > > we set the default value to small, we would be "more aggressive" on
> > > pausing
> > > > which may impact throughput.
> > > >
> > > > 3) Yes exactly, this would naturally be at the "partition-group"
> class
> > > > since that represents the task's all input partitions.
> > > >
> > > > 4) This is just a bold thought, I'm interested to see other's
> thoughts.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Aug 23, 2021 at 4:10 AM Sagar <sa...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Guozhang.
> > > > >
> > > > > 1) Just for my confirmation, when you say we should proceed with
> the
> > > even
> > > > > distribution of bytes, are you referring to the Proposed Solution
> in
> > > the
> > > > > KIP or the option you had considered in the JIRA?
> > > > > 2) Default value for the config is something that I missed. I agree
> > we
> > > > > can't have really large values as it might be detrimental to the
> > > > > performance. Maybe, as a starting point, we assume that only 1
> Stream
> > > > Task
> > > > > is running so what could be the ideal value in such a scenario?
> > > Somewhere
> > > > > around 10MB similar to the caching config?
> > > > > 3) When you say,  *a task level metric indicating the current
> totally
> > > > > aggregated metrics, * you mean the bytes aggregated at a task
> level?
> > > > > 4) I am ok with the name change, but would like to know others'
> > > thoughts.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > > On Sun, Aug 22, 2021 at 11:54 PM Guozhang Wang <wangguoz@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Thanks Sagar for writing this PR.
> > > > > >
> > > > > > I think twice about the options that have been proposed in
> > > > > > https://issues.apache.org/jira/browse/KAFKA-13152, and feel that
> > at
> > > > the
> > > > > > moment it's simpler to just do the even distribution of the
> > > configured
> > > > > > total bytes. My rationale is that right now we have a static
> tasks
> > ->
> > > > > > threads mapping, and hence each partition would only be fetched
> by
> > a
> > > > > single
> > > > > > thread / consumer at a given time. If in the future we break that
> > > > static
> > > > > > mapping into dynamic mapping, then we would not be able to do
> this
> > > even
> > > > > > distribution. Instead we would have other threads polling from
> > > consumer
> > > > > > only, and those threads would be responsible for checking the
> > config
> > > > and
> > > > > > pause non-empty partitions if it goes beyond the threshold. But
> > since
> > > > at
> > > > > > that time we would not change the config but just how it would be
> > > > > > implemented behind the scenes we would not need another KIP to
> > change
> > > > it.
> > > > > >
> > > > > > Some more comments:
> > > > > >
> > > > > > 1. We need to discuss a bit about the default value of this new
> > > config.
> > > > > > Personally I think we need to be a bit conservative with large
> > values
> > > > so
> > > > > > that it would not have any perf regression compared with old
> > configs
> > > > > > especially with large topology and large number of partitions.
> > > > > > 2. I looked at the existing metrics, and do not have
> corresponding
> > > > > sensors.
> > > > > > How about also adding a task level metric indicating the current
> > > > totally
> > > > > > aggregated metrics. The reason I do not suggest this metric on
> the
> > > > > > per-thread level is that in the future we may break the static
> > > mapping
> > > > of
> > > > > > tasks -> threads.
> > > > > >
> > > > > > [optional] As an orthogonal thought, I'm thinking maybe we can
> > rename
> > > > the
> > > > > > other "*cache.max.bytes.buffering*" as
> "statestore.cache.max.bytes"
> > > > (via
> > > > > > deprecation of course), piggy-backed in this KIP? Would like to
> > hear
> > > > > > others' thoughts.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sun, Aug 22, 2021 at 9:29 AM Sagar <sagarmeansocean@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I would like to start a discussion on the following KIP:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>