You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Luke Chen <sh...@gmail.com> on 2021/10/22 08:52:08 UTC

[VOTE] KIP-782: Expandable batch size in producer

Hi Kafka dev,
I'd like to start a vote for the proposal: KIP-782: Expandable batch size
in producer.

The main purpose for this KIP is to have better memory usage in producer,
and also save users from the dilemma while setting the batch size
configuration. After this KIP, users can set a higher batch.size without
worries, and of course, with an appropriate "batch.initial.size".

Derailed description can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer

Any comments and feedback are welcome.

Thank you.
Luke

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Luke Chen <sh...@gmail.com>.
Hi devs,
Bump this thread.
Call for vote for: KIP-782: Expandable batch size in producer.

The main goal for this KIP is:
1. higher throughput in producer
2. better memory usage in producer

Detailed description can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer

Any feedback and comments is welcome.

Thank you.
Luke

On Fri, Nov 5, 2021 at 4:37 PM Luke Chen <sh...@gmail.com> wrote:

> Hi Mickael,
> Thanks for the good comments! Answering them below:
>
> - When under load, the producer may allocate extra buffers. Are these
> buffers ever released if the load drops?
> --> This is a good point that I've never considered before. Yes, after
> introducing the "batch.max.size", we should release some buffer out of the
> buffer pools. In this KIP, we'll only keep maximum "batch.size" into pool,
> and mark the rest of memory as free to use. The reason we keep maximum
> "batch.size" back to pool is because the semantic of "batch.size" is the
> batch full limit. In most cases, the batch.size should be able to contain
> the records to be sent within linger.ms time.
>
> - Do we really need batch.initial.size? It's not clear that having this
> extra setting adds a lot of value.
> --> I think "batch.initial.size" is important to achieve higher memory
> usage. Now, I made the default value to 4KB, so after upgrading to the new
> release, the producer memory usage will become better.
>
> I've updated the KIP.
>
> Thank you.
> Luke
>
> On Wed, Nov 3, 2021 at 6:44 PM Mickael Maison <mi...@gmail.com>
> wrote:
>
>> Hi Luke,
>>
>> Thanks for the KIP. It looks like an interesting idea. I like the
>> concept of dynamically adjusting settings to handle load. I wonder if
>> other client settings could also benefit from a similar logic.
>>
>> Just a couple of questions:
>> - When under load, the producer may allocate extra buffers. Are these
>> buffers ever released if the load drops?
>> - Do we really need batch.initial.size? It's not clear that having
>> this extra setting adds a lot of value.
>>
>> Thanks,
>> Mickael
>>
>> On Tue, Oct 26, 2021 at 11:12 AM Luke Chen <sh...@gmail.com> wrote:
>> >
>> > Thank you, Artem!
>> >
>> > @devs, welcome to vote for this KIP.
>> > Key proposal:
>> > 1. allocate multiple smaller initial batch size buffer in producer, and
>> > list them together when expansion for better memory usage
>> > 2. add a max batch size config in producer, so when producer rate is
>> > suddenly high, we can still have high throughput with batch size larger
>> > than "batch.size" (and less than "batch.max.size", where "batch.size" is
>> > soft limit and "batch.max.size" is hard limit)
>> > Here's the updated KIP:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>> >
>> > And, any comments and feedback are welcome.
>> >
>> > Thank you.
>> > Luke
>> >
>> > On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits
>> > <al...@confluent.io.invalid> wrote:
>> >
>> > > Hi Luke,
>> > >
>> > > I've looked at the updated KIP-782, it looks good to me.
>> > >
>> > > -Artem
>> > >
>> > > On Sun, Oct 24, 2021 at 1:46 AM Luke Chen <sh...@gmail.com> wrote:
>> > >
>> > > > Hi Artem,
>> > > > Thanks for your good suggestion again.
>> > > > I've combined your idea into this KIP, and updated it.
>> > > > Note, in the end, I still keep the "batch.initial.size" config
>> (default
>> > > is
>> > > > 0, which means "batch.size" will be initial batch size) for better
>> memory
>> > > > conservation.
>> > > >
>> > > > Detailed description can be found here:
>> > > >
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>> > > >
>> > > > Let me know if you have other suggestions.
>> > > >
>> > > > Thank you.
>> > > > Luke
>> > > >
>> > > > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <sh...@gmail.com>
>> wrote:
>> > > >
>> > > >> Hi Artem,
>> > > >> Thanks for the suggestion. Let me confirm my understanding is
>> correct.
>> > > >> So, what you suggest is that the "batch.size" is more like a "soft
>> > > limit"
>> > > >> batch size, and the "hard limit" is "batch.max.size". When
>> reaching the
>> > > >> batch.size of the buffer, it means the buffer is "ready" to be be
>> sent.
>> > > But
>> > > >> before the linger.ms reached, if there are more data coming, we
>> can
>> > > >> still accumulate it into the same buffer, until it reached the
>> > > >> "batch.max.size". After it reached the "batch.max.size", we'll
>> create
>> > > >> another batch for it.
>> > > >>
>> > > >> So after your suggestion, we won't need the "batch.initial.size",
>> and we
>> > > >> can use "batch.size" as the initial batch size. We list each
>> > > "batch.size"
>> > > >> together, until it reached "batch.max.size". Something like this:
>> > > >>
>> > > >> [image: image.png]
>> > > >> Is my understanding correct?
>> > > >> If so, that sounds good to me.
>> > > >> If not, please kindly explain more to me.
>> > > >>
>> > > >> Thank you.
>> > > >> Luke
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
>> > > >> <al...@confluent.io.invalid> wrote:
>> > > >>
>> > > >>> Hi Luke,
>> > > >>>
>> > > >>> Nice suggestion.  It should optimize how memory is used with
>> different
>> > > >>> production rates, but I wonder if we can take this idea further
>> and
>> > > >>> improve
>> > > >>> batching in general.
>> > > >>>
>> > > >>> Currently batch.size is used in two conditions:
>> > > >>>
>> > > >>> 1. When we append records to a batch in the accumulator, we
>> create a
>> > > new
>> > > >>> batch if the current batch would exceed the batch.size.
>> > > >>> 2. When we drain the batch from the accumulator, a batch becomes
>> > > 'ready'
>> > > >>> when it reaches batch.size.
>> > > >>>
>> > > >>> The second condition is good with the current batch size, because
>> if
>> > > >>> linger.ms is greater than 0, the send can be triggered by
>> > > accomplishing
>> > > >>> the
>> > > >>> batching goal.
>> > > >>>
>> > > >>> The first condition, though, leads to creating many batches if the
>> > > >>> network
>> > > >>> latency or production rate (or both) is high, and with 5
>> in-flight and
>> > > >>> 16KB
>> > > >>> batches we can only have 80KB of data in-flight per partition.
>> Which
>> > > >>> means
>> > > >>> that with 50ms latency, we can only push ~1.6MB/sec per partition
>> (this
>> > > >>> goes down if we consider higher latencies, e.g. with 100ms we can
>> only
>> > > >>> push
>> > > >>> ~0.8MB/sec).
>> > > >>>
>> > > >>> I think it would be great to separate the two sizes:
>> > > >>>
>> > > >>> 1. When appending records to a batch, create a new batch if the
>> current
>> > > >>> exceeds a larger size (we can call it batch.max.size), say 256KB
>> by
>> > > >>> default.
>> > > >>> 2. When we drain, consider batch 'ready' if it exceeds batch.size,
>> > > which
>> > > >>> is
>> > > >>> 16KB by default.
>> > > >>>
>> > > >>> For memory conservation we may introduce batch.initial.size if we
>> want
>> > > to
>> > > >>> have a flexibility to make it even smaller than batch.size, or we
>> can
>> > > >>> just
>> > > >>> always use batch.size as the initial size (in which case we don't
>> > > >>> need batch.initial.size config).
>> > > >>>
>> > > >>> -Artem
>> > > >>>
>> > > >>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com>
>> wrote:
>> > > >>>
>> > > >>> > Hi Kafka dev,
>> > > >>> > I'd like to start a vote for the proposal: KIP-782: Expandable
>> batch
>> > > >>> size
>> > > >>> > in producer.
>> > > >>> >
>> > > >>> > The main purpose for this KIP is to have better memory usage in
>> > > >>> producer,
>> > > >>> > and also save users from the dilemma while setting the batch
>> size
>> > > >>> > configuration. After this KIP, users can set a higher batch.size
>> > > >>> without
>> > > >>> > worries, and of course, with an appropriate
>> "batch.initial.size".
>> > > >>> >
>> > > >>> > Derailed description can be found here:
>> > > >>> >
>> > > >>> >
>> > > >>>
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>> > > >>> >
>> > > >>> > Any comments and feedback are welcome.
>> > > >>> >
>> > > >>> > Thank you.
>> > > >>> > Luke
>> > > >>> >
>> > > >>>
>> > > >>
>> > >
>>
>

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Luke Chen <sh...@gmail.com>.
Hi Mickael,
Thanks for the good comments! Answering them below:

- When under load, the producer may allocate extra buffers. Are these
buffers ever released if the load drops?
--> This is a good point that I've never considered before. Yes, after
introducing the "batch.max.size", we should release some buffer out of the
buffer pools. In this KIP, we'll only keep maximum "batch.size" into pool,
and mark the rest of memory as free to use. The reason we keep maximum
"batch.size" back to pool is because the semantic of "batch.size" is the
batch full limit. In most cases, the batch.size should be able to contain
the records to be sent within linger.ms time.

- Do we really need batch.initial.size? It's not clear that having this
extra setting adds a lot of value.
--> I think "batch.initial.size" is important to achieve higher memory
usage. Now, I made the default value to 4KB, so after upgrading to the new
release, the producer memory usage will become better.

I've updated the KIP.

Thank you.
Luke

On Wed, Nov 3, 2021 at 6:44 PM Mickael Maison <mi...@gmail.com>
wrote:

> Hi Luke,
>
> Thanks for the KIP. It looks like an interesting idea. I like the
> concept of dynamically adjusting settings to handle load. I wonder if
> other client settings could also benefit from a similar logic.
>
> Just a couple of questions:
> - When under load, the producer may allocate extra buffers. Are these
> buffers ever released if the load drops?
> - Do we really need batch.initial.size? It's not clear that having
> this extra setting adds a lot of value.
>
> Thanks,
> Mickael
>
> On Tue, Oct 26, 2021 at 11:12 AM Luke Chen <sh...@gmail.com> wrote:
> >
> > Thank you, Artem!
> >
> > @devs, welcome to vote for this KIP.
> > Key proposal:
> > 1. allocate multiple smaller initial batch size buffer in producer, and
> > list them together when expansion for better memory usage
> > 2. add a max batch size config in producer, so when producer rate is
> > suddenly high, we can still have high throughput with batch size larger
> > than "batch.size" (and less than "batch.max.size", where "batch.size" is
> > soft limit and "batch.max.size" is hard limit)
> > Here's the updated KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> >
> > And, any comments and feedback are welcome.
> >
> > Thank you.
> > Luke
> >
> > On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits
> > <al...@confluent.io.invalid> wrote:
> >
> > > Hi Luke,
> > >
> > > I've looked at the updated KIP-782, it looks good to me.
> > >
> > > -Artem
> > >
> > > On Sun, Oct 24, 2021 at 1:46 AM Luke Chen <sh...@gmail.com> wrote:
> > >
> > > > Hi Artem,
> > > > Thanks for your good suggestion again.
> > > > I've combined your idea into this KIP, and updated it.
> > > > Note, in the end, I still keep the "batch.initial.size" config
> (default
> > > is
> > > > 0, which means "batch.size" will be initial batch size) for better
> memory
> > > > conservation.
> > > >
> > > > Detailed description can be found here:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > >
> > > > Let me know if you have other suggestions.
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <sh...@gmail.com>
> wrote:
> > > >
> > > >> Hi Artem,
> > > >> Thanks for the suggestion. Let me confirm my understanding is
> correct.
> > > >> So, what you suggest is that the "batch.size" is more like a "soft
> > > limit"
> > > >> batch size, and the "hard limit" is "batch.max.size". When reaching
> the
> > > >> batch.size of the buffer, it means the buffer is "ready" to be be
> sent.
> > > But
> > > >> before the linger.ms reached, if there are more data coming, we can
> > > >> still accumulate it into the same buffer, until it reached the
> > > >> "batch.max.size". After it reached the "batch.max.size", we'll
> create
> > > >> another batch for it.
> > > >>
> > > >> So after your suggestion, we won't need the "batch.initial.size",
> and we
> > > >> can use "batch.size" as the initial batch size. We list each
> > > "batch.size"
> > > >> together, until it reached "batch.max.size". Something like this:
> > > >>
> > > >> [image: image.png]
> > > >> Is my understanding correct?
> > > >> If so, that sounds good to me.
> > > >> If not, please kindly explain more to me.
> > > >>
> > > >> Thank you.
> > > >> Luke
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
> > > >> <al...@confluent.io.invalid> wrote:
> > > >>
> > > >>> Hi Luke,
> > > >>>
> > > >>> Nice suggestion.  It should optimize how memory is used with
> different
> > > >>> production rates, but I wonder if we can take this idea further and
> > > >>> improve
> > > >>> batching in general.
> > > >>>
> > > >>> Currently batch.size is used in two conditions:
> > > >>>
> > > >>> 1. When we append records to a batch in the accumulator, we create
> a
> > > new
> > > >>> batch if the current batch would exceed the batch.size.
> > > >>> 2. When we drain the batch from the accumulator, a batch becomes
> > > 'ready'
> > > >>> when it reaches batch.size.
> > > >>>
> > > >>> The second condition is good with the current batch size, because
> if
> > > >>> linger.ms is greater than 0, the send can be triggered by
> > > accomplishing
> > > >>> the
> > > >>> batching goal.
> > > >>>
> > > >>> The first condition, though, leads to creating many batches if the
> > > >>> network
> > > >>> latency or production rate (or both) is high, and with 5 in-flight
> and
> > > >>> 16KB
> > > >>> batches we can only have 80KB of data in-flight per partition.
> Which
> > > >>> means
> > > >>> that with 50ms latency, we can only push ~1.6MB/sec per partition
> (this
> > > >>> goes down if we consider higher latencies, e.g. with 100ms we can
> only
> > > >>> push
> > > >>> ~0.8MB/sec).
> > > >>>
> > > >>> I think it would be great to separate the two sizes:
> > > >>>
> > > >>> 1. When appending records to a batch, create a new batch if the
> current
> > > >>> exceeds a larger size (we can call it batch.max.size), say 256KB by
> > > >>> default.
> > > >>> 2. When we drain, consider batch 'ready' if it exceeds batch.size,
> > > which
> > > >>> is
> > > >>> 16KB by default.
> > > >>>
> > > >>> For memory conservation we may introduce batch.initial.size if we
> want
> > > to
> > > >>> have a flexibility to make it even smaller than batch.size, or we
> can
> > > >>> just
> > > >>> always use batch.size as the initial size (in which case we don't
> > > >>> need batch.initial.size config).
> > > >>>
> > > >>> -Artem
> > > >>>
> > > >>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com>
> wrote:
> > > >>>
> > > >>> > Hi Kafka dev,
> > > >>> > I'd like to start a vote for the proposal: KIP-782: Expandable
> batch
> > > >>> size
> > > >>> > in producer.
> > > >>> >
> > > >>> > The main purpose for this KIP is to have better memory usage in
> > > >>> producer,
> > > >>> > and also save users from the dilemma while setting the batch size
> > > >>> > configuration. After this KIP, users can set a higher batch.size
> > > >>> without
> > > >>> > worries, and of course, with an appropriate "batch.initial.size".
> > > >>> >
> > > >>> > Derailed description can be found here:
> > > >>> >
> > > >>> >
> > > >>>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > >>> >
> > > >>> > Any comments and feedback are welcome.
> > > >>> >
> > > >>> > Thank you.
> > > >>> > Luke
> > > >>> >
> > > >>>
> > > >>
> > >
>

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Mickael Maison <mi...@gmail.com>.
Hi Luke,

Thanks for the KIP. It looks like an interesting idea. I like the
concept of dynamically adjusting settings to handle load. I wonder if
other client settings could also benefit from a similar logic.

Just a couple of questions:
- When under load, the producer may allocate extra buffers. Are these
buffers ever released if the load drops?
- Do we really need batch.initial.size? It's not clear that having
this extra setting adds a lot of value.

Thanks,
Mickael

On Tue, Oct 26, 2021 at 11:12 AM Luke Chen <sh...@gmail.com> wrote:
>
> Thank you, Artem!
>
> @devs, welcome to vote for this KIP.
> Key proposal:
> 1. allocate multiple smaller initial batch size buffer in producer, and
> list them together when expansion for better memory usage
> 2. add a max batch size config in producer, so when producer rate is
> suddenly high, we can still have high throughput with batch size larger
> than "batch.size" (and less than "batch.max.size", where "batch.size" is
> soft limit and "batch.max.size" is hard limit)
> Here's the updated KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>
> And, any comments and feedback are welcome.
>
> Thank you.
> Luke
>
> On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits
> <al...@confluent.io.invalid> wrote:
>
> > Hi Luke,
> >
> > I've looked at the updated KIP-782, it looks good to me.
> >
> > -Artem
> >
> > On Sun, Oct 24, 2021 at 1:46 AM Luke Chen <sh...@gmail.com> wrote:
> >
> > > Hi Artem,
> > > Thanks for your good suggestion again.
> > > I've combined your idea into this KIP, and updated it.
> > > Note, in the end, I still keep the "batch.initial.size" config (default
> > is
> > > 0, which means "batch.size" will be initial batch size) for better memory
> > > conservation.
> > >
> > > Detailed description can be found here:
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > >
> > > Let me know if you have other suggestions.
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <sh...@gmail.com> wrote:
> > >
> > >> Hi Artem,
> > >> Thanks for the suggestion. Let me confirm my understanding is correct.
> > >> So, what you suggest is that the "batch.size" is more like a "soft
> > limit"
> > >> batch size, and the "hard limit" is "batch.max.size". When reaching the
> > >> batch.size of the buffer, it means the buffer is "ready" to be be sent.
> > But
> > >> before the linger.ms reached, if there are more data coming, we can
> > >> still accumulate it into the same buffer, until it reached the
> > >> "batch.max.size". After it reached the "batch.max.size", we'll create
> > >> another batch for it.
> > >>
> > >> So after your suggestion, we won't need the "batch.initial.size", and we
> > >> can use "batch.size" as the initial batch size. We list each
> > "batch.size"
> > >> together, until it reached "batch.max.size". Something like this:
> > >>
> > >> [image: image.png]
> > >> Is my understanding correct?
> > >> If so, that sounds good to me.
> > >> If not, please kindly explain more to me.
> > >>
> > >> Thank you.
> > >> Luke
> > >>
> > >>
> > >>
> > >>
> > >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
> > >> <al...@confluent.io.invalid> wrote:
> > >>
> > >>> Hi Luke,
> > >>>
> > >>> Nice suggestion.  It should optimize how memory is used with different
> > >>> production rates, but I wonder if we can take this idea further and
> > >>> improve
> > >>> batching in general.
> > >>>
> > >>> Currently batch.size is used in two conditions:
> > >>>
> > >>> 1. When we append records to a batch in the accumulator, we create a
> > new
> > >>> batch if the current batch would exceed the batch.size.
> > >>> 2. When we drain the batch from the accumulator, a batch becomes
> > 'ready'
> > >>> when it reaches batch.size.
> > >>>
> > >>> The second condition is good with the current batch size, because if
> > >>> linger.ms is greater than 0, the send can be triggered by
> > accomplishing
> > >>> the
> > >>> batching goal.
> > >>>
> > >>> The first condition, though, leads to creating many batches if the
> > >>> network
> > >>> latency or production rate (or both) is high, and with 5 in-flight and
> > >>> 16KB
> > >>> batches we can only have 80KB of data in-flight per partition.  Which
> > >>> means
> > >>> that with 50ms latency, we can only push ~1.6MB/sec per partition (this
> > >>> goes down if we consider higher latencies, e.g. with 100ms we can only
> > >>> push
> > >>> ~0.8MB/sec).
> > >>>
> > >>> I think it would be great to separate the two sizes:
> > >>>
> > >>> 1. When appending records to a batch, create a new batch if the current
> > >>> exceeds a larger size (we can call it batch.max.size), say 256KB by
> > >>> default.
> > >>> 2. When we drain, consider batch 'ready' if it exceeds batch.size,
> > which
> > >>> is
> > >>> 16KB by default.
> > >>>
> > >>> For memory conservation we may introduce batch.initial.size if we want
> > to
> > >>> have a flexibility to make it even smaller than batch.size, or we can
> > >>> just
> > >>> always use batch.size as the initial size (in which case we don't
> > >>> need batch.initial.size config).
> > >>>
> > >>> -Artem
> > >>>
> > >>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com> wrote:
> > >>>
> > >>> > Hi Kafka dev,
> > >>> > I'd like to start a vote for the proposal: KIP-782: Expandable batch
> > >>> size
> > >>> > in producer.
> > >>> >
> > >>> > The main purpose for this KIP is to have better memory usage in
> > >>> producer,
> > >>> > and also save users from the dilemma while setting the batch size
> > >>> > configuration. After this KIP, users can set a higher batch.size
> > >>> without
> > >>> > worries, and of course, with an appropriate "batch.initial.size".
> > >>> >
> > >>> > Derailed description can be found here:
> > >>> >
> > >>> >
> > >>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > >>> >
> > >>> > Any comments and feedback are welcome.
> > >>> >
> > >>> > Thank you.
> > >>> > Luke
> > >>> >
> > >>>
> > >>
> >

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Luke Chen <sh...@gmail.com>.
Thank you, Artem!

@devs, welcome to vote for this KIP.
Key proposal:
1. allocate multiple smaller initial batch size buffer in producer, and
list them together when expansion for better memory usage
2. add a max batch size config in producer, so when producer rate is
suddenly high, we can still have high throughput with batch size larger
than "batch.size" (and less than "batch.max.size", where "batch.size" is
soft limit and "batch.max.size" is hard limit)
Here's the updated KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer

And, any comments and feedback are welcome.

Thank you.
Luke

On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits
<al...@confluent.io.invalid> wrote:

> Hi Luke,
>
> I've looked at the updated KIP-782, it looks good to me.
>
> -Artem
>
> On Sun, Oct 24, 2021 at 1:46 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Artem,
> > Thanks for your good suggestion again.
> > I've combined your idea into this KIP, and updated it.
> > Note, in the end, I still keep the "batch.initial.size" config (default
> is
> > 0, which means "batch.size" will be initial batch size) for better memory
> > conservation.
> >
> > Detailed description can be found here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> >
> > Let me know if you have other suggestions.
> >
> > Thank you.
> > Luke
> >
> > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <sh...@gmail.com> wrote:
> >
> >> Hi Artem,
> >> Thanks for the suggestion. Let me confirm my understanding is correct.
> >> So, what you suggest is that the "batch.size" is more like a "soft
> limit"
> >> batch size, and the "hard limit" is "batch.max.size". When reaching the
> >> batch.size of the buffer, it means the buffer is "ready" to be be sent.
> But
> >> before the linger.ms reached, if there are more data coming, we can
> >> still accumulate it into the same buffer, until it reached the
> >> "batch.max.size". After it reached the "batch.max.size", we'll create
> >> another batch for it.
> >>
> >> So after your suggestion, we won't need the "batch.initial.size", and we
> >> can use "batch.size" as the initial batch size. We list each
> "batch.size"
> >> together, until it reached "batch.max.size". Something like this:
> >>
> >> [image: image.png]
> >> Is my understanding correct?
> >> If so, that sounds good to me.
> >> If not, please kindly explain more to me.
> >>
> >> Thank you.
> >> Luke
> >>
> >>
> >>
> >>
> >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
> >> <al...@confluent.io.invalid> wrote:
> >>
> >>> Hi Luke,
> >>>
> >>> Nice suggestion.  It should optimize how memory is used with different
> >>> production rates, but I wonder if we can take this idea further and
> >>> improve
> >>> batching in general.
> >>>
> >>> Currently batch.size is used in two conditions:
> >>>
> >>> 1. When we append records to a batch in the accumulator, we create a
> new
> >>> batch if the current batch would exceed the batch.size.
> >>> 2. When we drain the batch from the accumulator, a batch becomes
> 'ready'
> >>> when it reaches batch.size.
> >>>
> >>> The second condition is good with the current batch size, because if
> >>> linger.ms is greater than 0, the send can be triggered by
> accomplishing
> >>> the
> >>> batching goal.
> >>>
> >>> The first condition, though, leads to creating many batches if the
> >>> network
> >>> latency or production rate (or both) is high, and with 5 in-flight and
> >>> 16KB
> >>> batches we can only have 80KB of data in-flight per partition.  Which
> >>> means
> >>> that with 50ms latency, we can only push ~1.6MB/sec per partition (this
> >>> goes down if we consider higher latencies, e.g. with 100ms we can only
> >>> push
> >>> ~0.8MB/sec).
> >>>
> >>> I think it would be great to separate the two sizes:
> >>>
> >>> 1. When appending records to a batch, create a new batch if the current
> >>> exceeds a larger size (we can call it batch.max.size), say 256KB by
> >>> default.
> >>> 2. When we drain, consider batch 'ready' if it exceeds batch.size,
> which
> >>> is
> >>> 16KB by default.
> >>>
> >>> For memory conservation we may introduce batch.initial.size if we want
> to
> >>> have a flexibility to make it even smaller than batch.size, or we can
> >>> just
> >>> always use batch.size as the initial size (in which case we don't
> >>> need batch.initial.size config).
> >>>
> >>> -Artem
> >>>
> >>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com> wrote:
> >>>
> >>> > Hi Kafka dev,
> >>> > I'd like to start a vote for the proposal: KIP-782: Expandable batch
> >>> size
> >>> > in producer.
> >>> >
> >>> > The main purpose for this KIP is to have better memory usage in
> >>> producer,
> >>> > and also save users from the dilemma while setting the batch size
> >>> > configuration. After this KIP, users can set a higher batch.size
> >>> without
> >>> > worries, and of course, with an appropriate "batch.initial.size".
> >>> >
> >>> > Derailed description can be found here:
> >>> >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> >>> >
> >>> > Any comments and feedback are welcome.
> >>> >
> >>> > Thank you.
> >>> > Luke
> >>> >
> >>>
> >>
>

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Artem Livshits <al...@confluent.io.INVALID>.
Hi Luke,

I've looked at the updated KIP-782, it looks good to me.

-Artem

On Sun, Oct 24, 2021 at 1:46 AM Luke Chen <sh...@gmail.com> wrote:

> Hi Artem,
> Thanks for your good suggestion again.
> I've combined your idea into this KIP, and updated it.
> Note, in the end, I still keep the "batch.initial.size" config (default is
> 0, which means "batch.size" will be initial batch size) for better memory
> conservation.
>
> Detailed description can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>
> Let me know if you have other suggestions.
>
> Thank you.
> Luke
>
> On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <sh...@gmail.com> wrote:
>
>> Hi Artem,
>> Thanks for the suggestion. Let me confirm my understanding is correct.
>> So, what you suggest is that the "batch.size" is more like a "soft limit"
>> batch size, and the "hard limit" is "batch.max.size". When reaching the
>> batch.size of the buffer, it means the buffer is "ready" to be be sent. But
>> before the linger.ms reached, if there are more data coming, we can
>> still accumulate it into the same buffer, until it reached the
>> "batch.max.size". After it reached the "batch.max.size", we'll create
>> another batch for it.
>>
>> So after your suggestion, we won't need the "batch.initial.size", and we
>> can use "batch.size" as the initial batch size. We list each "batch.size"
>> together, until it reached "batch.max.size". Something like this:
>>
>> [image: image.png]
>> Is my understanding correct?
>> If so, that sounds good to me.
>> If not, please kindly explain more to me.
>>
>> Thank you.
>> Luke
>>
>>
>>
>>
>> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
>> <al...@confluent.io.invalid> wrote:
>>
>>> Hi Luke,
>>>
>>> Nice suggestion.  It should optimize how memory is used with different
>>> production rates, but I wonder if we can take this idea further and
>>> improve
>>> batching in general.
>>>
>>> Currently batch.size is used in two conditions:
>>>
>>> 1. When we append records to a batch in the accumulator, we create a new
>>> batch if the current batch would exceed the batch.size.
>>> 2. When we drain the batch from the accumulator, a batch becomes 'ready'
>>> when it reaches batch.size.
>>>
>>> The second condition is good with the current batch size, because if
>>> linger.ms is greater than 0, the send can be triggered by accomplishing
>>> the
>>> batching goal.
>>>
>>> The first condition, though, leads to creating many batches if the
>>> network
>>> latency or production rate (or both) is high, and with 5 in-flight and
>>> 16KB
>>> batches we can only have 80KB of data in-flight per partition.  Which
>>> means
>>> that with 50ms latency, we can only push ~1.6MB/sec per partition (this
>>> goes down if we consider higher latencies, e.g. with 100ms we can only
>>> push
>>> ~0.8MB/sec).
>>>
>>> I think it would be great to separate the two sizes:
>>>
>>> 1. When appending records to a batch, create a new batch if the current
>>> exceeds a larger size (we can call it batch.max.size), say 256KB by
>>> default.
>>> 2. When we drain, consider batch 'ready' if it exceeds batch.size, which
>>> is
>>> 16KB by default.
>>>
>>> For memory conservation we may introduce batch.initial.size if we want to
>>> have a flexibility to make it even smaller than batch.size, or we can
>>> just
>>> always use batch.size as the initial size (in which case we don't
>>> need batch.initial.size config).
>>>
>>> -Artem
>>>
>>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com> wrote:
>>>
>>> > Hi Kafka dev,
>>> > I'd like to start a vote for the proposal: KIP-782: Expandable batch
>>> size
>>> > in producer.
>>> >
>>> > The main purpose for this KIP is to have better memory usage in
>>> producer,
>>> > and also save users from the dilemma while setting the batch size
>>> > configuration. After this KIP, users can set a higher batch.size
>>> without
>>> > worries, and of course, with an appropriate "batch.initial.size".
>>> >
>>> > Derailed description can be found here:
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>>> >
>>> > Any comments and feedback are welcome.
>>> >
>>> > Thank you.
>>> > Luke
>>> >
>>>
>>

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Luke Chen <sh...@gmail.com>.
Hi Artem,
Thanks for your good suggestion again.
I've combined your idea into this KIP, and updated it.
Note, in the end, I still keep the "batch.initial.size" config (default is
0, which means "batch.size" will be initial batch size) for better memory
conservation.

Detailed description can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer

Let me know if you have other suggestions.

Thank you.
Luke

On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <sh...@gmail.com> wrote:

> Hi Artem,
> Thanks for the suggestion. Let me confirm my understanding is correct.
> So, what you suggest is that the "batch.size" is more like a "soft limit"
> batch size, and the "hard limit" is "batch.max.size". When reaching the
> batch.size of the buffer, it means the buffer is "ready" to be be sent. But
> before the linger.ms reached, if there are more data coming, we can still
> accumulate it into the same buffer, until it reached the "batch.max.size".
> After it reached the "batch.max.size", we'll create another batch for it.
>
> So after your suggestion, we won't need the "batch.initial.size", and we
> can use "batch.size" as the initial batch size. We list each "batch.size"
> together, until it reached "batch.max.size". Something like this:
>
> [image: image.png]
> Is my understanding correct?
> If so, that sounds good to me.
> If not, please kindly explain more to me.
>
> Thank you.
> Luke
>
>
>
>
> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
> <al...@confluent.io.invalid> wrote:
>
>> Hi Luke,
>>
>> Nice suggestion.  It should optimize how memory is used with different
>> production rates, but I wonder if we can take this idea further and
>> improve
>> batching in general.
>>
>> Currently batch.size is used in two conditions:
>>
>> 1. When we append records to a batch in the accumulator, we create a new
>> batch if the current batch would exceed the batch.size.
>> 2. When we drain the batch from the accumulator, a batch becomes 'ready'
>> when it reaches batch.size.
>>
>> The second condition is good with the current batch size, because if
>> linger.ms is greater than 0, the send can be triggered by accomplishing
>> the
>> batching goal.
>>
>> The first condition, though, leads to creating many batches if the network
>> latency or production rate (or both) is high, and with 5 in-flight and
>> 16KB
>> batches we can only have 80KB of data in-flight per partition.  Which
>> means
>> that with 50ms latency, we can only push ~1.6MB/sec per partition (this
>> goes down if we consider higher latencies, e.g. with 100ms we can only
>> push
>> ~0.8MB/sec).
>>
>> I think it would be great to separate the two sizes:
>>
>> 1. When appending records to a batch, create a new batch if the current
>> exceeds a larger size (we can call it batch.max.size), say 256KB by
>> default.
>> 2. When we drain, consider batch 'ready' if it exceeds batch.size, which
>> is
>> 16KB by default.
>>
>> For memory conservation we may introduce batch.initial.size if we want to
>> have a flexibility to make it even smaller than batch.size, or we can just
>> always use batch.size as the initial size (in which case we don't
>> need batch.initial.size config).
>>
>> -Artem
>>
>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com> wrote:
>>
>> > Hi Kafka dev,
>> > I'd like to start a vote for the proposal: KIP-782: Expandable batch
>> size
>> > in producer.
>> >
>> > The main purpose for this KIP is to have better memory usage in
>> producer,
>> > and also save users from the dilemma while setting the batch size
>> > configuration. After this KIP, users can set a higher batch.size without
>> > worries, and of course, with an appropriate "batch.initial.size".
>> >
>> > Derailed description can be found here:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>> >
>> > Any comments and feedback are welcome.
>> >
>> > Thank you.
>> > Luke
>> >
>>
>

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Luke Chen <sh...@gmail.com>.
Hi Artem,
Thanks for the suggestion. Let me confirm my understanding is correct.
So, what you suggest is that the "batch.size" is more like a "soft limit"
batch size, and the "hard limit" is "batch.max.size". When reaching the
batch.size of the buffer, it means the buffer is "ready" to be be sent. But
before the linger.ms reached, if there are more data coming, we can still
accumulate it into the same buffer, until it reached the "batch.max.size".
After it reached the "batch.max.size", we'll create another batch for it.

So after your suggestion, we won't need the "batch.initial.size", and we
can use "batch.size" as the initial batch size. We list each "batch.size"
together, until it reached "batch.max.size". Something like this:

[image: image.png]
Is my understanding correct?
If so, that sounds good to me.
If not, please kindly explain more to me.

Thank you.
Luke




On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
<al...@confluent.io.invalid> wrote:

> Hi Luke,
>
> Nice suggestion.  It should optimize how memory is used with different
> production rates, but I wonder if we can take this idea further and improve
> batching in general.
>
> Currently batch.size is used in two conditions:
>
> 1. When we append records to a batch in the accumulator, we create a new
> batch if the current batch would exceed the batch.size.
> 2. When we drain the batch from the accumulator, a batch becomes 'ready'
> when it reaches batch.size.
>
> The second condition is good with the current batch size, because if
> linger.ms is greater than 0, the send can be triggered by accomplishing
> the
> batching goal.
>
> The first condition, though, leads to creating many batches if the network
> latency or production rate (or both) is high, and with 5 in-flight and 16KB
> batches we can only have 80KB of data in-flight per partition.  Which means
> that with 50ms latency, we can only push ~1.6MB/sec per partition (this
> goes down if we consider higher latencies, e.g. with 100ms we can only push
> ~0.8MB/sec).
>
> I think it would be great to separate the two sizes:
>
> 1. When appending records to a batch, create a new batch if the current
> exceeds a larger size (we can call it batch.max.size), say 256KB by
> default.
> 2. When we drain, consider batch 'ready' if it exceeds batch.size, which is
> 16KB by default.
>
> For memory conservation we may introduce batch.initial.size if we want to
> have a flexibility to make it even smaller than batch.size, or we can just
> always use batch.size as the initial size (in which case we don't
> need batch.initial.size config).
>
> -Artem
>
> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Kafka dev,
> > I'd like to start a vote for the proposal: KIP-782: Expandable batch size
> > in producer.
> >
> > The main purpose for this KIP is to have better memory usage in producer,
> > and also save users from the dilemma while setting the batch size
> > configuration. After this KIP, users can set a higher batch.size without
> > worries, and of course, with an appropriate "batch.initial.size".
> >
> > Derailed description can be found here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> >
> > Any comments and feedback are welcome.
> >
> > Thank you.
> > Luke
> >
>

Re: [VOTE] KIP-782: Expandable batch size in producer

Posted by Artem Livshits <al...@confluent.io.INVALID>.
Hi Luke,

Nice suggestion.  It should optimize how memory is used with different
production rates, but I wonder if we can take this idea further and improve
batching in general.

Currently batch.size is used in two conditions:

1. When we append records to a batch in the accumulator, we create a new
batch if the current batch would exceed the batch.size.
2. When we drain the batch from the accumulator, a batch becomes 'ready'
when it reaches batch.size.

The second condition is good with the current batch size, because if
linger.ms is greater than 0, the send can be triggered by accomplishing the
batching goal.

The first condition, though, leads to creating many batches if the network
latency or production rate (or both) is high, and with 5 in-flight and 16KB
batches we can only have 80KB of data in-flight per partition.  Which means
that with 50ms latency, we can only push ~1.6MB/sec per partition (this
goes down if we consider higher latencies, e.g. with 100ms we can only push
~0.8MB/sec).

I think it would be great to separate the two sizes:

1. When appending records to a batch, create a new batch if the current
exceeds a larger size (we can call it batch.max.size), say 256KB by default.
2. When we drain, consider batch 'ready' if it exceeds batch.size, which is
16KB by default.

For memory conservation we may introduce batch.initial.size if we want to
have a flexibility to make it even smaller than batch.size, or we can just
always use batch.size as the initial size (in which case we don't
need batch.initial.size config).

-Artem

On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <sh...@gmail.com> wrote:

> Hi Kafka dev,
> I'd like to start a vote for the proposal: KIP-782: Expandable batch size
> in producer.
>
> The main purpose for this KIP is to have better memory usage in producer,
> and also save users from the dilemma while setting the batch size
> configuration. After this KIP, users can set a higher batch.size without
> worries, and of course, with an appropriate "batch.initial.size".
>
> Derailed description can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>
> Any comments and feedback are welcome.
>
> Thank you.
> Luke
>