You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jun Rao <ju...@confluent.io.INVALID> on 2021/12/08 01:28:18 UTC

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Hi, Luke,

Thanks for the KIP.  A few comments below.

10. Accumulating small batches could improve memory usage. Will that
introduce extra copying when generating a produce request? Currently, a
produce request takes a single MemoryRecords per partition.
11. Do we need to introduce a new config batch.max.size? Could we just
increase the default of batch.size? We probably need to have KIP-794
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner>
resolved
before increasing the default batch size since the larger the batch size,
the worse the problem in KIP-794.
12. As for max.request.size, currently it's used for both the max record
size and the max request size, which is unintuitive. Perhaps we could
introduce a new config max.record.size that defaults to 1MB. We could then
increase max.request.size to sth like 10MB.

Thanks,

Jun


On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
<al...@confluent.io.invalid> wrote:

> Hi Luke,
>
> I don't mind increasing the max.request.size to a higher number, e.g. 2MB
> could be good.  I think we should also run some benchmarks to see the
> effects of different sizes.
>
> I agree that changing round robin to random solves an independent existing
> issue, however the logic in this KIP exacerbates the issue, so there is
> some dependency.
>
> -Artem
>
> On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Artem,
> > Yes, I agree if we go with random selection instead of round-robin
> > selection, the latency issue will be more fair. That is, if there are 10
> > partitions, the 10th partition will always be the last choice in each
> round
> > in current design, but with random selection, the chance to be selected
> is
> > more fair.
> >
> > However, I think that's kind of out of scope with this KIP. This is an
> > existing issue, and it might need further discussion to decide if this
> > change is necessary.
> >
> > I agree the default 32KB for "batch.max.size" might be not huge
> improvement
> > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > "batch.max.size", and make the documentation clear that if the
> > "batch.max.size"
> > is increased, there might be chances that the "ready" partitions need to
> > wait for next request to send to broker, because of the
> "max.request.size"
> > (default 1MB) limitation. "max.request.size" can also be considered to
> > increase to avoid this issue. What do you think?
> >
> > Thank you.
> > Luke
> >
> > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > <al...@confluent.io.invalid> wrote:
> >
> > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > >
> > > I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> > > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s per
> > > partition.  We should probably set up some testing and see if 256KB has
> > > problems.
> > >
> > > To illustrate latency dynamics, let's consider a simplified model: 1
> > > in-flight request per broker, produce latency 125ms, 256KB max request
> > > size, 16 partitions assigned to the same broker, every second 128KB is
> > > produced to each partition (total production rate is 2MB/sec).
> > >
> > > If the batch size is 16KB, then the pattern would be the following:
> > >
> > > 0ms - produce 128KB into each partition
> > > 0ms - take 16KB from each partition send (total 256KB)
> > > 125ms - complete first 16KB from each partition, send next 16KB
> > > 250ms - complete second 16KB, send next 16KB
> > > ...
> > > 1000ms - complete 8th 16KB from each partition
> > >
> > > from this model it's easy to see that there are 256KB that are sent
> > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > > 875ms.
> > >
> > > If the batch size is 256KB, then the pattern would be the following:
> > >
> > > 0ms - produce 128KB into each partition
> > > 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> > > 125ms - complete 2 first partitions, send data from next 2 partitions
> > > ...
> > > 1000ms - complete last 2 partitions
> > >
> > > even though the pattern is different, there are still 256KB that are
> sent
> > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > > 875ms.
> > >
> > > Now, in this example if we do strictly round-robin (current
> > implementation)
> > > and we have this exact pattern (not sure how often such regular pattern
> > > would happen in practice -- I would expect that it would be a bit more
> > > random), some partitions would experience higher latency than others
> (not
> > > sure how much it would matter in practice -- in the end of the day some
> > > bytes produced to a topic would have higher latency and some bytes
> would
> > > have lower latency).  This pattern is easily fixed by choosing the next
> > > partition randomly instead of using round-robin.
> > >
> > > -Artem
> > >
> > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <sh...@gmail.com> wrote:
> > >
> > > > Hi Tom,
> > > > Thanks for your comments. And thanks for Artem's explanation.
> > > > Below is my response:
> > > >
> > > > > Currently because buffers are allocated using batch.size it means
> we
> > > can
> > > > handle records that are that large (e.g. one big record per batch).
> > > Doesn't
> > > > the introduction of smaller buffer sizes (batch.initial.size) mean a
> > > > corresponding decrease in the maximum record size that the producer
> can
> > > > handle?
> > > >
> > > > Actually, the "batch.size" is only like a threshold to decide if the
> > > batch
> > > > is "ready to be sent". That is, even if you set the "batch.size=16KB"
> > > > (default value), users can still send one record sized with 20KB, as
> > long
> > > > as the size is less than "max.request.size" in producer (default
> 1MB).
> > > > Therefore, the introduction of "batch.initial.size" won't decrease
> the
> > > > maximum record size that the producer can handle.
> > > >
> > > > > But isn't there the risk that drainBatchesForOneNode would end up
> not
> > > > sending ready
> > > > batches well past when they ought to be sent (according to their
> > > linger.ms
> > > > ),
> > > > because it's sending buffers for earlier partitions too aggressively?
> > > >
> > > > Did you mean that we have a "max.request.size" per request (default
> is
> > > > 1MB), and before this KIP, the request can include 64 batches in
> single
> > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able to
> > > > include 32 batches or less, because we aggressively sent more records
> > in
> > > > one batch, is that what you meant? That's a really good point that
> I've
> > > > never thought about. I think your suggestion to go through other
> > > partitions
> > > > that just fit "batch.size", or expire "linger.ms" first, before
> > handling
> > > > the one that is > "batch.size" limit is not a good way, because it
> > might
> > > > cause the one with size > "batch.size" always in the lowest priority,
> > and
> > > > cause starving issue that the batch won't have chance to get sent.
> > > >
> > > > I don't have better solution for it, but maybe I can firstly decrease
> > the
> > > > "batch.max.size" to 32KB, instead of aggressively 256KB in the KIP.
> > That
> > > > should alleviate the problem. And still improve the throughput. What
> do
> > > you
> > > > think?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > <al...@confluent.io.invalid> wrote:
> > > >
> > > > > > I think this KIP would change the behaviour of producers when
> there
> > > are
> > > > > multiple partitions ready to be sent
> > > > >
> > > > > This is correct, the pattern changes and becomes more
> coarse-grained.
> > > > But
> > > > > I don't think it changes fairness over the long run.  I think it's
> a
> > > good
> > > > > idea to change drainIndex to be random rather than round robin to
> > avoid
> > > > > forming patterns where some partitions would consistently get
> higher
> > > > > latencies than others because they wait longer for their turn.
> > > > >
> > > > > If we really wanted to preserve the exact patterns, we could either
> > try
> > > > to
> > > > > support multiple 16KB batches from one partition per request
> > (probably
> > > > > would require protocol change to change logic on the broker for
> > > duplicate
> > > > > detection) or try to re-batch 16KB batches from accumulator into
> > larger
> > > > > batches during send (additional computations) or try to consider
> all
> > > > > partitions assigned to a broker to check if a new batch needs to be
> > > > created
> > > > > (i.e. compare cumulative batch size from all partitions assigned
> to a
> > > > > broker and create new batch when cumulative size is 1MB, more
> > complex).
> > > > >
> > > > > Overall, it seems like just increasing the max batch size is a
> > simpler
> > > > > solution and it does favor larger batch sizes, which is beneficial
> > not
> > > > just
> > > > > for production.
> > > > >
> > > > > > ready batches well past when they ought to be sent (according to
> > > their
> > > > > linger.ms)
> > > > >
> > > > > The trigger for marking batches ready to be sent isn't changed - a
> > > batch
> > > > is
> > > > > ready to be sent once it reaches 16KB, so by the time larger
> batches
> > > > start
> > > > > forming, linger.ms wouldn't matter much because the batching goal
> is
> > > met
> > > > > and the batch can be sent immediately.  Larger batches start
> forming
> > > once
> > > > > the client starts waiting for the server, in which case some data
> > will
> > > > wait
> > > > > its turn to be sent.  This will happen for some data regardless of
> > how
> > > we
> > > > > pick data to send, the question is just whether we'd have some
> > > scenarios
> > > > > where some partitions would consistently experience higher latency
> > than
> > > > > others.  I think picking drainIndex randomly would prevent such
> > > > scenarios.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tb...@redhat.com>
> > > wrote:
> > > > >
> > > > > > Hi Luke,
> > > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > Currently because buffers are allocated using batch.size it means
> > we
> > > > can
> > > > > > handle records that are that large (e.g. one big record per
> batch).
> > > > > Doesn't
> > > > > > the introduction of smaller buffer sizes (batch.initial.size)
> mean
> > a
> > > > > > corresponding decrease in the maximum record size that the
> producer
> > > can
> > > > > > handle? That might not be a problem if the user knows their
> maximum
> > > > > record
> > > > > > size and has tuned batch.initial.size accordingly, but if the
> > default
> > > > for
> > > > > > batch.initial.size < batch.size it could cause regressions for
> > > existing
> > > > > > users with a large record size, I think. It should be enough for
> > > > > > batch.initial.size to default to batch.size, allowing users who
> > care
> > > > > about
> > > > > > the memory saving in the off-peak throughput case to do the
> tuning,
> > > but
> > > > > not
> > > > > > causing a regression for existing users.
> > > > > >
> > > > > > I think this KIP would change the behaviour of producers when
> there
> > > are
> > > > > > multiple partitions ready to be sent: By sending all the ready
> > > buffers
> > > > > > (which may now be > batch.size) for the first partition, we could
> > end
> > > > up
> > > > > > excluding ready buffers for other partitions from the current
> send.
> > > In
> > > > > > other words, as I understand the KIP currently, there's a change
> in
> > > > > > fairness. I think the code in
> > > RecordAccumulator#drainBatchesForOneNode
> > > > > will
> > > > > > ensure fairness in the long run, because the drainIndex will
> ensure
> > > > that
> > > > > > those other partitions each get their turn at being the first.
> But
> > > > isn't
> > > > > > there the risk that drainBatchesForOneNode would end up not
> sending
> > > > ready
> > > > > > batches well past when they ought to be sent (according to their
> > > > > linger.ms
> > > > > > ),
> > > > > > because it's sending buffers for earlier partitions too
> > aggressively?
> > > > Or,
> > > > > > to put it another way, perhaps the RecordAccumulator should
> > > round-robin
> > > > > the
> > > > > > ready buffers for _all_ the partitions before trying to fill the
> > > > > remaining
> > > > > > space with the extra buffers (beyond the batch.size limit) for
> the
> > > > first
> > > > > > partitions?
> > > > > >
> > > > > > Kind regards,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <sh...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Ismael and all devs,
> > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > If no, I'm going to update the KIP based on my previous mail,
> and
> > > > > start a
> > > > > > > vote tomorrow or next week.
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <sh...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Hi Ismael,
> > > > > > > > Thanks for your comments.
> > > > > > > >
> > > > > > > > 1. Why do we have to reallocate the buffer? We can keep a
> list
> > of
> > > > > > buffers
> > > > > > > > instead and avoid reallocation.
> > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > "buffer.initial.size",
> > > > > > > > and link them together (with linked list)?
> > > > > > > > ex:
> > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > | 4KB |
> > > > > > > >
> > > > > > > > b. when new records reached and the remaining buffer is not
> > > enough
> > > > > for
> > > > > > > the
> > > > > > > > records, we create another batch with "batch.initial.size"
> > buffer
> > > > > > > > ex: we already have 3KB of data in the 1st buffer, and here
> > comes
> > > > the
> > > > > > 2KB
> > > > > > > > record
> > > > > > > >
> > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > now, record: 2KB coming
> > > > > > > > We fill the 1st 1KB into 1st buffer, and create new buffer,
> and
> > > > > linked
> > > > > > > > together, and fill the rest of data into it
> > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > >
> > > > > > > > Is that what you mean?
> > > > > > > > If so, I think I like this idea!
> > > > > > > > If not, please explain more detail about it.
> > > > > > > > Thank you.
> > > > > > > >
> > > > > > > > 2. I think we should also consider tweaking the semantics of
> > > > > batch.size
> > > > > > > so
> > > > > > > > that the sent batches can be larger if the batch is not ready
> > to
> > > be
> > > > > > sent
> > > > > > > > (while still respecting max.request.size and perhaps a new
> > > > > > > max.batch.size).
> > > > > > > >
> > > > > > > > --> In the KIP, I was trying to make the "batch.size" as the
> > > upper
> > > > > > bound
> > > > > > > > of the batch size, and introduce a "batch.initial.size" as
> > > initial
> > > > > > batch
> > > > > > > > size.
> > > > > > > > So are you saying that we can let "batch.size" as initial
> batch
> > > > size
> > > > > > and
> > > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > > That's a good suggestion, but that would change the semantics
> > of
> > > > > > > > "batch.size", which might surprise some users. I think my
> > > original
> > > > > > > proposal
> > > > > > > > ("batch.initial.size") is safer for users. What do you think?
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> ismael@juma.me.uk
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > >> I think we should also consider tweaking the semantics of
> > > > batch.size
> > > > > > so
> > > > > > > >> that the sent batches can be larger if the batch is not
> ready
> > to
> > > > be
> > > > > > sent
> > > > > > > >> (while still respecting max.request.size and perhaps a new
> > > > > > > >> max.batch.size).
> > > > > > > >>
> > > > > > > >> Ismael
> > > > > > > >>
> > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> ismael@juma.me.uk
> > >
> > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hi Luke,
> > > > > > > >> >
> > > > > > > >> > Thanks for the KIP. Why do we have to reallocate the
> buffer?
> > > We
> > > > > can
> > > > > > > >> keep a
> > > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > > >> >
> > > > > > > >> > Ismael
> > > > > > > >> >
> > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> showuon@gmail.com>
> > > > > wrote:
> > > > > > > >> >
> > > > > > > >> >> Hi Kafka dev,
> > > > > > > >> >> I'd like to start the discussion for the proposal:
> KIP-782:
> > > > > > > Expandable
> > > > > > > >> >> batch size in producer.
> > > > > > > >> >>
> > > > > > > >> >> The main purpose for this KIP is to have better memory
> > usage
> > > in
> > > > > > > >> producer,
> > > > > > > >> >> and also save users from the dilemma while setting the
> > batch
> > > > size
> > > > > > > >> >> configuration. After this KIP, users can set a higher
> > > > batch.size
> > > > > > > >> without
> > > > > > > >> >> worries, and of course, with an appropriate
> > > > "batch.initial.size"
> > > > > > and
> > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > >> >>
> > > > > > > >> >> Derailed description can be found here:
> > > > > > > >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > >> >>
> > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > >> >>
> > > > > > > >> >> Thank you.
> > > > > > > >> >> Luke
> > > > > > > >> >>
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Jack Vanlightly <va...@apache.org>.
The current configs are hard to use for the Kafka user and a little inflexible so I am pleased to see the discussion.

Ultimately we want flexibility. We don't want to force users to understand the underlying implementation/protocol and we want the producer to handle high or low throughput efficiently - with the right amount of inflight data for the conditions.

Given each producer by default has limited inflight requests per connection then we need extra flexibility in the batch sizing.

The way I would like to be able to approach as a user is that 1) I use linger.ms to bound my client-side produce latency 2) I use request.size and max.inflight.requests.per.connection to calculate my upper-bound inflight data. As a user, I don't have to think about anything else (no edge cases or caveats). The producer handles the batching intelligently, trying to fully utilise the request sizes while respecting the linger.

I understand Jun's point about increasing the default batch size. The only downside I see is having to juggle the request vs batch size to avoid delaying batches because we can only fit so many big batches in a request. That is a calculation we force on the user and reduces flexibility.

As I understand the proposed change, each ProducerBatch would be physically made up of a list of ByteBuffers that allows for dynamically increasing the batch size in increments of batch.initial.size.

The documented benefit is that we can send large batches when there is lots of data ready to be sent, while also not having to wait for linger.ms when there is less data. Comparing this to Jun's suggested approach, it looks the same except that we get size triggered batches instead of linger when throughput is low. How big an advantage is that?

An alternative dynamic sizing strategy is one based on fairness. Batches are dynamically sized in a fair way that more fairly distributes the data of each partition across the request.

Messages are added to small sub-batches (say 4kb or 16kb) in the accumulator. When draining the sub-batches in the Sender, the logic selects sub-batches by creation order repeatedly until constraints are met (like request.size). All sub-batches of the same partition are grouped into a single batch for sending. 

This way the user can set a high batch.size (could even default it to request size) as the final batches will be fairly distributed across partitions and we should get good utilisation of the request.size (and still respecting linger).

Example with request.size=128kb, 16kb sub-batches, 1 broker and picking sub-batches in creation order. None have reached linger in this example for simplicity.

Accumulator
p0: [sb1{t1},sb2{t2},sb3{t3},sb4{t4},sb5{t5},sb6{t6},sb7{t7},sb8{t8}]
p1: [sb1{t1}]
p2: [sb1{t3}]
p3: [sb1{t7}]

Request 1
p0: [sb1-sb5]
p1: [sb1]
p2: [sb1]

Request 2 (with no additional data having had arrived)
p0: [sb6-sb8]
p3: [sb1]

The downsides are a decent refactoring.
There would need to be changes to the housekeeping of how callbacks are managed for example. There might be more impact that I am not aware of, I'm fairly new to this code.

Thanks
Jack

On 2021/12/13 19:15:37 Jun Rao wrote:
> Hi, Lucas,
> 
> Thanks for the reply. It would be useful to summarize the benefits of a
> separate batch.max.size. To me, it's not clear why a user would want two
> different batch sizes. In your example, I can understand why a user would
> want to form a batch with a 5ms linger. But why would a user prefer 16K
> batches with 5ms linger, if say 256K is deemed best for throughput?
> 
> Thanks,
> 
> Jun
> 
> On Fri, Dec 10, 2021 at 4:35 PM Lucas Bradstreet <lu...@confluent.io.invalid>
> wrote:
> 
> > Hi Jun,
> >
> > One difference compared to increasing the default batch size is that users
> > may actually prefer smaller batches but it makes much less sense to
> > accumulate many small batches if a batch is already sending.
> >
> > For example, imagine a user that prefer 16K batches with 5ms linger.
> > Everything is functioning normally and 16KB batches are being sent. Then
> > there's a 500ms blip for that broker. Do we want to continue to accumulate
> > 16KB batches, each of which requires a round trip, or would we prefer to
> > accumulate larger batches while sending is blocked?
> >
> > I'm not hugely against increasing the default batch.size in general, but
> > batch.max.size does seem to have some nice properties.
> >
> > Thanks,
> >
> > Lucas
> >
> > On Fri, Dec 10, 2021 at 9:42 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> >
> > > Hi, Artem, Luke,
> > >
> > > Thanks for the reply.
> > >
> > > 11. If we get rid of batch.max.size and increase the default batch.size,
> > > it's true the behavior is slightly different than before. However, does
> > > that difference matter to most of our users? In your example, if a user
> > > sets linger.ms to 100ms and thinks 256KB is good for throughput, does it
> > > matter to deliver any batch smaller than 256KB before 100ms? I also find
> > it
> > > a bit hard to explain to our users these 3 different settings related to
> > > batch size.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Dec 9, 2021 at 5:47 AM Luke Chen <sh...@gmail.com> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 11. In addition to Artem's comment, I think the reason to have
> > additional
> > > > "batch.max.size" is to have more flexibility to users.
> > > > For example:
> > > > With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data
> > coming
> > > to
> > > > a partition within 50ms. Now, sender is ready to pick up the batch to
> > > send.
> > > > In current design, we send 16KB data to broker, and keep the remaining
> > > 4KB
> > > > in the producer, to keep accumulating data.
> > > > But after this KIP, user can send the whole 20KB of data together. That
> > > is,
> > > > user can decide if they want to accumulate more data before the sender
> > is
> > > > ready, and send them together, to have higher throughput. The
> > > > "batch.size=16KB" in the proposal, is more like a soft limit, (and
> > > > "batch.max.size" is like a hard limit), or it's like a switch to enable
> > > the
> > > > batch to become ready. Before sender is ready, we can still accumulate
> > > more
> > > > data, and wrap them together to send to broker.
> > > >
> > > > User can increase "batch.size" to 20KB to achieve the same goal in the
> > > > current design, of course. But you can imagine, if the data within
> > 100ms
> > > is
> > > > just 18KB, then the batch of data will wait for 100ms passed to be sent
> > > > out. This "batch.max.size" config will allow more flexible for user
> > > config.
> > > >
> > > > Does that make sense?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits
> > > > <al...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > 11. That was my initial thinking as well, but in a discussion some
> > > people
> > > > > pointed out the change of behavior in some scenarios.  E.g. if
> > someone
> > > > for
> > > > > some reason really wants batches to be at least 16KB and sets large
> > > > > linger.ms, and most of the time the batches are filled quickly
> > enough
> > > > and
> > > > > they observe a certain latency.  Then they upgrade their client with
> > a
> > > > > default size 256KB and the latency increases.  This could be seen as
> > a
> > > > > regression.  It could be fixed by just reducing linger.ms to specify
> > > the
> > > > > expected latency, but still could be seen as a disruption by some
> > > users.
> > > > > The other reason to have 2 sizes is to avoid allocating large buffers
> > > > > upfront.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <ju...@confluent.io.invalid>
> > > wrote:
> > > > >
> > > > > > Hi, Artem,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > 11. Got it. To me, batch.size is really used for throughput and not
> > > for
> > > > > > latency guarantees. There is no guarantee when 16KB will be
> > > > accumulated.
> > > > > > So, if users want any latency guarantee, they will need to specify
> > > > > > linger.ms accordingly.
> > > > > > Then, batch.size can just be used to tune for throughput.
> > > > > >
> > > > > > 20. Could we also describe the unit of compression? Is
> > > > > > it batch.initial.size, batch.size or batch.max.size?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> > > > > > <al...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > 10. My understanding is that MemoryRecords would under the covers
> > > be
> > > > > > > allocated in chunks, so logically it still would be one
> > > MemoryRecords
> > > > > > > object, it's just instead of allocating one large chunk upfront,
> > > > > smaller
> > > > > > > chunks are allocated as needed to grow the batch and linked into
> > a
> > > > > list.
> > > > > > >
> > > > > > > 11. The reason for 2 sizes is to avoid change of behavior when
> > > > > triggering
> > > > > > > batch send with large linger.ms.  Currently, a batch send is
> > > > triggered
> > > > > > > once
> > > > > > > the batch reaches 16KB by default, if we just raise the default
> > to
> > > > > 256KB,
> > > > > > > then the batch send will be delayed.  Using a separate value
> > would
> > > > > allow
> > > > > > > keeping the current behavior when sending the batch out, but
> > > provide
> > > > > > better
> > > > > > > throughput with high latency + high bandwidth channels.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <jun@confluent.io.invalid
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Luke,
> > > > > > > >
> > > > > > > > Thanks for the KIP.  A few comments below.
> > > > > > > >
> > > > > > > > 10. Accumulating small batches could improve memory usage. Will
> > > > that
> > > > > > > > introduce extra copying when generating a produce request?
> > > > > Currently, a
> > > > > > > > produce request takes a single MemoryRecords per partition.
> > > > > > > > 11. Do we need to introduce a new config batch.max.size? Could
> > we
> > > > > just
> > > > > > > > increase the default of batch.size? We probably need to have
> > > > KIP-794
> > > > > > > > <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > > >
> > > > > > > > resolved
> > > > > > > > before increasing the default batch size since the larger the
> > > batch
> > > > > > size,
> > > > > > > > the worse the problem in KIP-794.
> > > > > > > > 12. As for max.request.size, currently it's used for both the
> > max
> > > > > > record
> > > > > > > > size and the max request size, which is unintuitive. Perhaps we
> > > > could
> > > > > > > > introduce a new config max.record.size that defaults to 1MB. We
> > > > could
> > > > > > > then
> > > > > > > > increase max.request.size to sth like 10MB.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hi Luke,
> > > > > > > > >
> > > > > > > > > I don't mind increasing the max.request.size to a higher
> > > number,
> > > > > e.g.
> > > > > > > 2MB
> > > > > > > > > could be good.  I think we should also run some benchmarks to
> > > see
> > > > > the
> > > > > > > > > effects of different sizes.
> > > > > > > > >
> > > > > > > > > I agree that changing round robin to random solves an
> > > independent
> > > > > > > > existing
> > > > > > > > > issue, however the logic in this KIP exacerbates the issue,
> > so
> > > > > there
> > > > > > is
> > > > > > > > > some dependency.
> > > > > > > > >
> > > > > > > > > -Artem
> > > > > > > > >
> > > > > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <
> > showuon@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Artem,
> > > > > > > > > > Yes, I agree if we go with random selection instead of
> > > > > round-robin
> > > > > > > > > > selection, the latency issue will be more fair. That is, if
> > > > there
> > > > > > are
> > > > > > > > 10
> > > > > > > > > > partitions, the 10th partition will always be the last
> > choice
> > > > in
> > > > > > each
> > > > > > > > > round
> > > > > > > > > > in current design, but with random selection, the chance to
> > > be
> > > > > > > selected
> > > > > > > > > is
> > > > > > > > > > more fair.
> > > > > > > > > >
> > > > > > > > > > However, I think that's kind of out of scope with this KIP.
> > > > This
> > > > > is
> > > > > > > an
> > > > > > > > > > existing issue, and it might need further discussion to
> > > decide
> > > > if
> > > > > > > this
> > > > > > > > > > change is necessary.
> > > > > > > > > >
> > > > > > > > > > I agree the default 32KB for "batch.max.size" might be not
> > > huge
> > > > > > > > > improvement
> > > > > > > > > > compared with 256KB. I'm thinking, maybe default to "64KB"
> > > for
> > > > > > > > > > "batch.max.size", and make the documentation clear that if
> > > the
> > > > > > > > > > "batch.max.size"
> > > > > > > > > > is increased, there might be chances that the "ready"
> > > > partitions
> > > > > > need
> > > > > > > > to
> > > > > > > > > > wait for next request to send to broker, because of the
> > > > > > > > > "max.request.size"
> > > > > > > > > > (default 1MB) limitation. "max.request.size" can also be
> > > > > considered
> > > > > > > to
> > > > > > > > > > increase to avoid this issue. What do you think?
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > > >  maybe I can firstly decrease the "batch.max.size" to
> > > 32KB
> > > > > > > > > > >
> > > > > > > > > > > I think 32KB is too small.  With 5 in-flight and 100ms
> > > > latency
> > > > > we
> > > > > > > can
> > > > > > > > > > > produce 1.6MB/s per partition.  With 256KB we can produce
> > > > > > 12.8MB/s
> > > > > > > > per
> > > > > > > > > > > partition.  We should probably set up some testing and
> > see
> > > if
> > > > > > 256KB
> > > > > > > > has
> > > > > > > > > > > problems.
> > > > > > > > > > >
> > > > > > > > > > > To illustrate latency dynamics, let's consider a
> > simplified
> > > > > > model:
> > > > > > > 1
> > > > > > > > > > > in-flight request per broker, produce latency 125ms,
> > 256KB
> > > > max
> > > > > > > > request
> > > > > > > > > > > size, 16 partitions assigned to the same broker, every
> > > second
> > > > > > 128KB
> > > > > > > > is
> > > > > > > > > > > produced to each partition (total production rate is
> > > > 2MB/sec).
> > > > > > > > > > >
> > > > > > > > > > > If the batch size is 16KB, then the pattern would be the
> > > > > > following:
> > > > > > > > > > >
> > > > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > > > > > > > 125ms - complete first 16KB from each partition, send
> > next
> > > > 16KB
> > > > > > > > > > > 250ms - complete second 16KB, send next 16KB
> > > > > > > > > > > ...
> > > > > > > > > > > 1000ms - complete 8th 16KB from each partition
> > > > > > > > > > >
> > > > > > > > > > > from this model it's easy to see that there are 256KB
> > that
> > > > are
> > > > > > sent
> > > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> > > are
> > > > > > sent
> > > > > > > in
> > > > > > > > > > > 875ms.
> > > > > > > > > > >
> > > > > > > > > > > If the batch size is 256KB, then the pattern would be the
> > > > > > > following:
> > > > > > > > > > >
> > > > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > > > 0ms - take 128KB each from first 2 partitions and send
> > > (total
> > > > > > > 256KB)
> > > > > > > > > > > 125ms - complete 2 first partitions, send data from next
> > 2
> > > > > > > partitions
> > > > > > > > > > > ...
> > > > > > > > > > > 1000ms - complete last 2 partitions
> > > > > > > > > > >
> > > > > > > > > > > even though the pattern is different, there are still
> > 256KB
> > > > > that
> > > > > > > are
> > > > > > > > > sent
> > > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> > > are
> > > > > > sent
> > > > > > > in
> > > > > > > > > > > 875ms.
> > > > > > > > > > >
> > > > > > > > > > > Now, in this example if we do strictly round-robin
> > (current
> > > > > > > > > > implementation)
> > > > > > > > > > > and we have this exact pattern (not sure how often such
> > > > regular
> > > > > > > > pattern
> > > > > > > > > > > would happen in practice -- I would expect that it would
> > > be a
> > > > > bit
> > > > > > > > more
> > > > > > > > > > > random), some partitions would experience higher latency
> > > than
> > > > > > > others
> > > > > > > > > (not
> > > > > > > > > > > sure how much it would matter in practice -- in the end
> > of
> > > > the
> > > > > > day
> > > > > > > > some
> > > > > > > > > > > bytes produced to a topic would have higher latency and
> > > some
> > > > > > bytes
> > > > > > > > > would
> > > > > > > > > > > have lower latency).  This pattern is easily fixed by
> > > > choosing
> > > > > > the
> > > > > > > > next
> > > > > > > > > > > partition randomly instead of using round-robin.
> > > > > > > > > > >
> > > > > > > > > > > -Artem
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <
> > > > showuon@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Tom,
> > > > > > > > > > > > Thanks for your comments. And thanks for Artem's
> > > > explanation.
> > > > > > > > > > > > Below is my response:
> > > > > > > > > > > >
> > > > > > > > > > > > > Currently because buffers are allocated using
> > > batch.size
> > > > it
> > > > > > > means
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > handle records that are that large (e.g. one big record
> > > per
> > > > > > > batch).
> > > > > > > > > > > Doesn't
> > > > > > > > > > > > the introduction of smaller buffer sizes
> > > > (batch.initial.size)
> > > > > > > mean
> > > > > > > > a
> > > > > > > > > > > > corresponding decrease in the maximum record size that
> > > the
> > > > > > > producer
> > > > > > > > > can
> > > > > > > > > > > > handle?
> > > > > > > > > > > >
> > > > > > > > > > > > Actually, the "batch.size" is only like a threshold to
> > > > decide
> > > > > > if
> > > > > > > > the
> > > > > > > > > > > batch
> > > > > > > > > > > > is "ready to be sent". That is, even if you set the
> > > > > > > > "batch.size=16KB"
> > > > > > > > > > > > (default value), users can still send one record sized
> > > with
> > > > > > 20KB,
> > > > > > > > as
> > > > > > > > > > long
> > > > > > > > > > > > as the size is less than "max.request.size" in producer
> > > > > > (default
> > > > > > > > > 1MB).
> > > > > > > > > > > > Therefore, the introduction of "batch.initial.size"
> > won't
> > > > > > > decrease
> > > > > > > > > the
> > > > > > > > > > > > maximum record size that the producer can handle.
> > > > > > > > > > > >
> > > > > > > > > > > > > But isn't there the risk that drainBatchesForOneNode
> > > > would
> > > > > > end
> > > > > > > up
> > > > > > > > > not
> > > > > > > > > > > > sending ready
> > > > > > > > > > > > batches well past when they ought to be sent (according
> > > to
> > > > > > their
> > > > > > > > > > > linger.ms
> > > > > > > > > > > > ),
> > > > > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > > > > aggressively?
> > > > > > > > > > > >
> > > > > > > > > > > > Did you mean that we have a "max.request.size" per
> > > request
> > > > > > > (default
> > > > > > > > > is
> > > > > > > > > > > > 1MB), and before this KIP, the request can include 64
> > > > batches
> > > > > > in
> > > > > > > > > single
> > > > > > > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we
> > > might
> > > > be
> > > > > > > able
> > > > > > > > to
> > > > > > > > > > > > include 32 batches or less, because we aggressively
> > sent
> > > > more
> > > > > > > > records
> > > > > > > > > > in
> > > > > > > > > > > > one batch, is that what you meant? That's a really good
> > > > point
> > > > > > > that
> > > > > > > > > I've
> > > > > > > > > > > > never thought about. I think your suggestion to go
> > > through
> > > > > > other
> > > > > > > > > > > partitions
> > > > > > > > > > > > that just fit "batch.size", or expire "linger.ms"
> > first,
> > > > > > before
> > > > > > > > > > handling
> > > > > > > > > > > > the one that is > "batch.size" limit is not a good way,
> > > > > because
> > > > > > > it
> > > > > > > > > > might
> > > > > > > > > > > > cause the one with size > "batch.size" always in the
> > > lowest
> > > > > > > > priority,
> > > > > > > > > > and
> > > > > > > > > > > > cause starving issue that the batch won't have chance
> > to
> > > > get
> > > > > > > sent.
> > > > > > > > > > > >
> > > > > > > > > > > > I don't have better solution for it, but maybe I can
> > > > firstly
> > > > > > > > decrease
> > > > > > > > > > the
> > > > > > > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB
> > > in
> > > > > the
> > > > > > > KIP.
> > > > > > > > > > That
> > > > > > > > > > > > should alleviate the problem. And still improve the
> > > > > throughput.
> > > > > > > > What
> > > > > > > > > do
> > > > > > > > > > > you
> > > > > > > > > > > > think?
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you.
> > > > > > > > > > > > Luke
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > > I think this KIP would change the behaviour of
> > > > producers
> > > > > > when
> > > > > > > > > there
> > > > > > > > > > > are
> > > > > > > > > > > > > multiple partitions ready to be sent
> > > > > > > > > > > > >
> > > > > > > > > > > > > This is correct, the pattern changes and becomes more
> > > > > > > > > coarse-grained.
> > > > > > > > > > > > But
> > > > > > > > > > > > > I don't think it changes fairness over the long
> > run.  I
> > > > > think
> > > > > > > > it's
> > > > > > > > > a
> > > > > > > > > > > good
> > > > > > > > > > > > > idea to change drainIndex to be random rather than
> > > round
> > > > > > robin
> > > > > > > to
> > > > > > > > > > avoid
> > > > > > > > > > > > > forming patterns where some partitions would
> > > consistently
> > > > > get
> > > > > > > > > higher
> > > > > > > > > > > > > latencies than others because they wait longer for
> > > their
> > > > > > turn.
> > > > > > > > > > > > >
> > > > > > > > > > > > > If we really wanted to preserve the exact patterns,
> > we
> > > > > could
> > > > > > > > either
> > > > > > > > > > try
> > > > > > > > > > > > to
> > > > > > > > > > > > > support multiple 16KB batches from one partition per
> > > > > request
> > > > > > > > > > (probably
> > > > > > > > > > > > > would require protocol change to change logic on the
> > > > broker
> > > > > > for
> > > > > > > > > > > duplicate
> > > > > > > > > > > > > detection) or try to re-batch 16KB batches from
> > > > accumulator
> > > > > > > into
> > > > > > > > > > larger
> > > > > > > > > > > > > batches during send (additional computations) or try
> > to
> > > > > > > consider
> > > > > > > > > all
> > > > > > > > > > > > > partitions assigned to a broker to check if a new
> > batch
> > > > > needs
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > > > created
> > > > > > > > > > > > > (i.e. compare cumulative batch size from all
> > partitions
> > > > > > > assigned
> > > > > > > > > to a
> > > > > > > > > > > > > broker and create new batch when cumulative size is
> > > 1MB,
> > > > > more
> > > > > > > > > > complex).
> > > > > > > > > > > > >
> > > > > > > > > > > > > Overall, it seems like just increasing the max batch
> > > size
> > > > > is
> > > > > > a
> > > > > > > > > > simpler
> > > > > > > > > > > > > solution and it does favor larger batch sizes, which
> > is
> > > > > > > > beneficial
> > > > > > > > > > not
> > > > > > > > > > > > just
> > > > > > > > > > > > > for production.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > ready batches well past when they ought to be sent
> > > > > > (according
> > > > > > > > to
> > > > > > > > > > > their
> > > > > > > > > > > > > linger.ms)
> > > > > > > > > > > > >
> > > > > > > > > > > > > The trigger for marking batches ready to be sent
> > isn't
> > > > > > changed
> > > > > > > -
> > > > > > > > a
> > > > > > > > > > > batch
> > > > > > > > > > > > is
> > > > > > > > > > > > > ready to be sent once it reaches 16KB, so by the time
> > > > > larger
> > > > > > > > > batches
> > > > > > > > > > > > start
> > > > > > > > > > > > > forming, linger.ms wouldn't matter much because the
> > > > > batching
> > > > > > > > goal
> > > > > > > > > is
> > > > > > > > > > > met
> > > > > > > > > > > > > and the batch can be sent immediately.  Larger
> > batches
> > > > > start
> > > > > > > > > forming
> > > > > > > > > > > once
> > > > > > > > > > > > > the client starts waiting for the server, in which
> > case
> > > > > some
> > > > > > > data
> > > > > > > > > > will
> > > > > > > > > > > > wait
> > > > > > > > > > > > > its turn to be sent.  This will happen for some data
> > > > > > regardless
> > > > > > > > of
> > > > > > > > > > how
> > > > > > > > > > > we
> > > > > > > > > > > > > pick data to send, the question is just whether we'd
> > > have
> > > > > > some
> > > > > > > > > > > scenarios
> > > > > > > > > > > > > where some partitions would consistently experience
> > > > higher
> > > > > > > > latency
> > > > > > > > > > than
> > > > > > > > > > > > > others.  I think picking drainIndex randomly would
> > > > prevent
> > > > > > such
> > > > > > > > > > > > scenarios.
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Artem
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> > > > > > > tbentley@redhat.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Luke,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Currently because buffers are allocated using
> > > > batch.size
> > > > > it
> > > > > > > > means
> > > > > > > > > > we
> > > > > > > > > > > > can
> > > > > > > > > > > > > > handle records that are that large (e.g. one big
> > > record
> > > > > per
> > > > > > > > > batch).
> > > > > > > > > > > > > Doesn't
> > > > > > > > > > > > > > the introduction of smaller buffer sizes
> > > > > > (batch.initial.size)
> > > > > > > > > mean
> > > > > > > > > > a
> > > > > > > > > > > > > > corresponding decrease in the maximum record size
> > > that
> > > > > the
> > > > > > > > > producer
> > > > > > > > > > > can
> > > > > > > > > > > > > > handle? That might not be a problem if the user
> > knows
> > > > > their
> > > > > > > > > maximum
> > > > > > > > > > > > > record
> > > > > > > > > > > > > > size and has tuned batch.initial.size accordingly,
> > > but
> > > > if
> > > > > > the
> > > > > > > > > > default
> > > > > > > > > > > > for
> > > > > > > > > > > > > > batch.initial.size < batch.size it could cause
> > > > > regressions
> > > > > > > for
> > > > > > > > > > > existing
> > > > > > > > > > > > > > users with a large record size, I think. It should
> > be
> > > > > > enough
> > > > > > > > for
> > > > > > > > > > > > > > batch.initial.size to default to batch.size,
> > allowing
> > > > > users
> > > > > > > who
> > > > > > > > > > care
> > > > > > > > > > > > > about
> > > > > > > > > > > > > > the memory saving in the off-peak throughput case
> > to
> > > do
> > > > > the
> > > > > > > > > tuning,
> > > > > > > > > > > but
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > causing a regression for existing users.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I think this KIP would change the behaviour of
> > > > producers
> > > > > > when
> > > > > > > > > there
> > > > > > > > > > > are
> > > > > > > > > > > > > > multiple partitions ready to be sent: By sending
> > all
> > > > the
> > > > > > > ready
> > > > > > > > > > > buffers
> > > > > > > > > > > > > > (which may now be > batch.size) for the first
> > > > partition,
> > > > > we
> > > > > > > > could
> > > > > > > > > > end
> > > > > > > > > > > > up
> > > > > > > > > > > > > > excluding ready buffers for other partitions from
> > the
> > > > > > current
> > > > > > > > > send.
> > > > > > > > > > > In
> > > > > > > > > > > > > > other words, as I understand the KIP currently,
> > > > there's a
> > > > > > > > change
> > > > > > > > > in
> > > > > > > > > > > > > > fairness. I think the code in
> > > > > > > > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > ensure fairness in the long run, because the
> > > drainIndex
> > > > > > will
> > > > > > > > > ensure
> > > > > > > > > > > > that
> > > > > > > > > > > > > > those other partitions each get their turn at being
> > > the
> > > > > > > first.
> > > > > > > > > But
> > > > > > > > > > > > isn't
> > > > > > > > > > > > > > there the risk that drainBatchesForOneNode would
> > end
> > > up
> > > > > not
> > > > > > > > > sending
> > > > > > > > > > > > ready
> > > > > > > > > > > > > > batches well past when they ought to be sent
> > > (according
> > > > > to
> > > > > > > > their
> > > > > > > > > > > > > linger.ms
> > > > > > > > > > > > > > ),
> > > > > > > > > > > > > > because it's sending buffers for earlier partitions
> > > too
> > > > > > > > > > aggressively?
> > > > > > > > > > > > Or,
> > > > > > > > > > > > > > to put it another way, perhaps the
> > RecordAccumulator
> > > > > should
> > > > > > > > > > > round-robin
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > ready buffers for _all_ the partitions before
> > trying
> > > to
> > > > > > fill
> > > > > > > > the
> > > > > > > > > > > > > remaining
> > > > > > > > > > > > > > space with the extra buffers (beyond the batch.size
> > > > > limit)
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > > first
> > > > > > > > > > > > > > partitions?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Kind regards,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Tom
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <
> > > > > > showuon@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > > > > > > > If no, I'm going to update the KIP based on my
> > > > previous
> > > > > > > mail,
> > > > > > > > > and
> > > > > > > > > > > > > start a
> > > > > > > > > > > > > > > vote tomorrow or next week.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> > > > > > > showuon@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We
> > > can
> > > > > > keep a
> > > > > > > > > list
> > > > > > > > > > of
> > > > > > > > > > > > > > buffers
> > > > > > > > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > > > > > > > -> Do you mean we allocate multiple buffers
> > with
> > > > > > > > > > > > > "buffer.initial.size",
> > > > > > > > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > > > > > > > ex:
> > > > > > > > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > > > > > > > | 4KB |
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > b. when new records reached and the remaining
> > > > buffer
> > > > > is
> > > > > > > not
> > > > > > > > > > > enough
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > records, we create another batch with
> > > > > > > "batch.initial.size"
> > > > > > > > > > buffer
> > > > > > > > > > > > > > > > ex: we already have 3KB of data in the 1st
> > > buffer,
> > > > > and
> > > > > > > here
> > > > > > > > > > comes
> > > > > > > > > > > > the
> > > > > > > > > > > > > > 2KB
> > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create
> > > new
> > > > > > > buffer,
> > > > > > > > > and
> > > > > > > > > > > > > linked
> > > > > > > > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Is that what you mean?
> > > > > > > > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2. I think we should also consider tweaking the
> > > > > > semantics
> > > > > > > > of
> > > > > > > > > > > > > batch.size
> > > > > > > > > > > > > > > so
> > > > > > > > > > > > > > > > that the sent batches can be larger if the
> > batch
> > > is
> > > > > not
> > > > > > > > ready
> > > > > > > > > > to
> > > > > > > > > > > be
> > > > > > > > > > > > > > sent
> > > > > > > > > > > > > > > > (while still respecting max.request.size and
> > > > perhaps
> > > > > a
> > > > > > > new
> > > > > > > > > > > > > > > max.batch.size).
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > --> In the KIP, I was trying to make the
> > > > "batch.size"
> > > > > > as
> > > > > > > > the
> > > > > > > > > > > upper
> > > > > > > > > > > > > > bound
> > > > > > > > > > > > > > > > of the batch size, and introduce a
> > > > > "batch.initial.size"
> > > > > > > as
> > > > > > > > > > > initial
> > > > > > > > > > > > > > batch
> > > > > > > > > > > > > > > > size.
> > > > > > > > > > > > > > > > So are you saying that we can let "batch.size"
> > as
> > > > > > initial
> > > > > > > > > batch
> > > > > > > > > > > > size
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > introduce a "max.batch.size" as upper bound
> > > value?
> > > > > > > > > > > > > > > > That's a good suggestion, but that would change
> > > the
> > > > > > > > semantics
> > > > > > > > > > of
> > > > > > > > > > > > > > > > "batch.size", which might surprise some users.
> > I
> > > > > think
> > > > > > my
> > > > > > > > > > > original
> > > > > > > > > > > > > > > proposal
> > > > > > > > > > > > > > > > ("batch.initial.size") is safer for users. What
> > > do
> > > > > you
> > > > > > > > think?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > > > > > > > ismael@juma.me.uk
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >> I think we should also consider tweaking the
> > > > > semantics
> > > > > > > of
> > > > > > > > > > > > batch.size
> > > > > > > > > > > > > > so
> > > > > > > > > > > > > > > >> that the sent batches can be larger if the
> > batch
> > > > is
> > > > > > not
> > > > > > > > > ready
> > > > > > > > > > to
> > > > > > > > > > > > be
> > > > > > > > > > > > > > sent
> > > > > > > > > > > > > > > >> (while still respecting max.request.size and
> > > > > perhaps a
> > > > > > > new
> > > > > > > > > > > > > > > >> max.batch.size).
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> Ismael
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > > > > > > > ismael@juma.me.uk
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> > Hi Luke,
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > Thanks for the KIP. Why do we have to
> > > reallocate
> > > > > the
> > > > > > > > > buffer?
> > > > > > > > > > > We
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > > >> keep a
> > > > > > > > > > > > > > > >> > list of buffers instead and avoid
> > > reallocation.
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > Ismael
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > > > > > > > showuon@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > > > > > > > >> >> I'd like to start the discussion for the
> > > > > proposal:
> > > > > > > > > KIP-782:
> > > > > > > > > > > > > > > Expandable
> > > > > > > > > > > > > > > >> >> batch size in producer.
> > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > >> >> The main purpose for this KIP is to have
> > > better
> > > > > > > memory
> > > > > > > > > > usage
> > > > > > > > > > > in
> > > > > > > > > > > > > > > >> producer,
> > > > > > > > > > > > > > > >> >> and also save users from the dilemma while
> > > > > setting
> > > > > > > the
> > > > > > > > > > batch
> > > > > > > > > > > > size
> > > > > > > > > > > > > > > >> >> configuration. After this KIP, users can
> > set
> > > a
> > > > > > higher
> > > > > > > > > > > > batch.size
> > > > > > > > > > > > > > > >> without
> > > > > > > > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > > > > > > > "batch.initial.size"
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > >> >> Thank you.
> > > > > > > > > > > > > > > >> >> Luke
> > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Lucas,

Thanks for the reply. It would be useful to summarize the benefits of a
separate batch.max.size. To me, it's not clear why a user would want two
different batch sizes. In your example, I can understand why a user would
want to form a batch with a 5ms linger. But why would a user prefer 16K
batches with 5ms linger, if say 256K is deemed best for throughput?

Thanks,

Jun

On Fri, Dec 10, 2021 at 4:35 PM Lucas Bradstreet <lu...@confluent.io.invalid>
wrote:

> Hi Jun,
>
> One difference compared to increasing the default batch size is that users
> may actually prefer smaller batches but it makes much less sense to
> accumulate many small batches if a batch is already sending.
>
> For example, imagine a user that prefer 16K batches with 5ms linger.
> Everything is functioning normally and 16KB batches are being sent. Then
> there's a 500ms blip for that broker. Do we want to continue to accumulate
> 16KB batches, each of which requires a round trip, or would we prefer to
> accumulate larger batches while sending is blocked?
>
> I'm not hugely against increasing the default batch.size in general, but
> batch.max.size does seem to have some nice properties.
>
> Thanks,
>
> Lucas
>
> On Fri, Dec 10, 2021 at 9:42 AM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Artem, Luke,
> >
> > Thanks for the reply.
> >
> > 11. If we get rid of batch.max.size and increase the default batch.size,
> > it's true the behavior is slightly different than before. However, does
> > that difference matter to most of our users? In your example, if a user
> > sets linger.ms to 100ms and thinks 256KB is good for throughput, does it
> > matter to deliver any batch smaller than 256KB before 100ms? I also find
> it
> > a bit hard to explain to our users these 3 different settings related to
> > batch size.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Dec 9, 2021 at 5:47 AM Luke Chen <sh...@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > 11. In addition to Artem's comment, I think the reason to have
> additional
> > > "batch.max.size" is to have more flexibility to users.
> > > For example:
> > > With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data
> coming
> > to
> > > a partition within 50ms. Now, sender is ready to pick up the batch to
> > send.
> > > In current design, we send 16KB data to broker, and keep the remaining
> > 4KB
> > > in the producer, to keep accumulating data.
> > > But after this KIP, user can send the whole 20KB of data together. That
> > is,
> > > user can decide if they want to accumulate more data before the sender
> is
> > > ready, and send them together, to have higher throughput. The
> > > "batch.size=16KB" in the proposal, is more like a soft limit, (and
> > > "batch.max.size" is like a hard limit), or it's like a switch to enable
> > the
> > > batch to become ready. Before sender is ready, we can still accumulate
> > more
> > > data, and wrap them together to send to broker.
> > >
> > > User can increase "batch.size" to 20KB to achieve the same goal in the
> > > current design, of course. But you can imagine, if the data within
> 100ms
> > is
> > > just 18KB, then the batch of data will wait for 100ms passed to be sent
> > > out. This "batch.max.size" config will allow more flexible for user
> > config.
> > >
> > > Does that make sense?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits
> > > <al...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 11. That was my initial thinking as well, but in a discussion some
> > people
> > > > pointed out the change of behavior in some scenarios.  E.g. if
> someone
> > > for
> > > > some reason really wants batches to be at least 16KB and sets large
> > > > linger.ms, and most of the time the batches are filled quickly
> enough
> > > and
> > > > they observe a certain latency.  Then they upgrade their client with
> a
> > > > default size 256KB and the latency increases.  This could be seen as
> a
> > > > regression.  It could be fixed by just reducing linger.ms to specify
> > the
> > > > expected latency, but still could be seen as a disruption by some
> > users.
> > > > The other reason to have 2 sizes is to avoid allocating large buffers
> > > > upfront.
> > > >
> > > > -Artem
> > > >
> > > > On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <ju...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 11. Got it. To me, batch.size is really used for throughput and not
> > for
> > > > > latency guarantees. There is no guarantee when 16KB will be
> > > accumulated.
> > > > > So, if users want any latency guarantee, they will need to specify
> > > > > linger.ms accordingly.
> > > > > Then, batch.size can just be used to tune for throughput.
> > > > >
> > > > > 20. Could we also describe the unit of compression? Is
> > > > > it batch.initial.size, batch.size or batch.max.size?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> > > > > <al...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > 10. My understanding is that MemoryRecords would under the covers
> > be
> > > > > > allocated in chunks, so logically it still would be one
> > MemoryRecords
> > > > > > object, it's just instead of allocating one large chunk upfront,
> > > > smaller
> > > > > > chunks are allocated as needed to grow the batch and linked into
> a
> > > > list.
> > > > > >
> > > > > > 11. The reason for 2 sizes is to avoid change of behavior when
> > > > triggering
> > > > > > batch send with large linger.ms.  Currently, a batch send is
> > > triggered
> > > > > > once
> > > > > > the batch reaches 16KB by default, if we just raise the default
> to
> > > > 256KB,
> > > > > > then the batch send will be delayed.  Using a separate value
> would
> > > > allow
> > > > > > keeping the current behavior when sending the batch out, but
> > provide
> > > > > better
> > > > > > throughput with high latency + high bandwidth channels.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <jun@confluent.io.invalid
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi, Luke,
> > > > > > >
> > > > > > > Thanks for the KIP.  A few comments below.
> > > > > > >
> > > > > > > 10. Accumulating small batches could improve memory usage. Will
> > > that
> > > > > > > introduce extra copying when generating a produce request?
> > > > Currently, a
> > > > > > > produce request takes a single MemoryRecords per partition.
> > > > > > > 11. Do we need to introduce a new config batch.max.size? Could
> we
> > > > just
> > > > > > > increase the default of batch.size? We probably need to have
> > > KIP-794
> > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > >
> > > > > > > resolved
> > > > > > > before increasing the default batch size since the larger the
> > batch
> > > > > size,
> > > > > > > the worse the problem in KIP-794.
> > > > > > > 12. As for max.request.size, currently it's used for both the
> max
> > > > > record
> > > > > > > size and the max request size, which is unintuitive. Perhaps we
> > > could
> > > > > > > introduce a new config max.record.size that defaults to 1MB. We
> > > could
> > > > > > then
> > > > > > > increase max.request.size to sth like 10MB.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > > > Hi Luke,
> > > > > > > >
> > > > > > > > I don't mind increasing the max.request.size to a higher
> > number,
> > > > e.g.
> > > > > > 2MB
> > > > > > > > could be good.  I think we should also run some benchmarks to
> > see
> > > > the
> > > > > > > > effects of different sizes.
> > > > > > > >
> > > > > > > > I agree that changing round robin to random solves an
> > independent
> > > > > > > existing
> > > > > > > > issue, however the logic in this KIP exacerbates the issue,
> so
> > > > there
> > > > > is
> > > > > > > > some dependency.
> > > > > > > >
> > > > > > > > -Artem
> > > > > > > >
> > > > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <
> showuon@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Artem,
> > > > > > > > > Yes, I agree if we go with random selection instead of
> > > > round-robin
> > > > > > > > > selection, the latency issue will be more fair. That is, if
> > > there
> > > > > are
> > > > > > > 10
> > > > > > > > > partitions, the 10th partition will always be the last
> choice
> > > in
> > > > > each
> > > > > > > > round
> > > > > > > > > in current design, but with random selection, the chance to
> > be
> > > > > > selected
> > > > > > > > is
> > > > > > > > > more fair.
> > > > > > > > >
> > > > > > > > > However, I think that's kind of out of scope with this KIP.
> > > This
> > > > is
> > > > > > an
> > > > > > > > > existing issue, and it might need further discussion to
> > decide
> > > if
> > > > > > this
> > > > > > > > > change is necessary.
> > > > > > > > >
> > > > > > > > > I agree the default 32KB for "batch.max.size" might be not
> > huge
> > > > > > > > improvement
> > > > > > > > > compared with 256KB. I'm thinking, maybe default to "64KB"
> > for
> > > > > > > > > "batch.max.size", and make the documentation clear that if
> > the
> > > > > > > > > "batch.max.size"
> > > > > > > > > is increased, there might be chances that the "ready"
> > > partitions
> > > > > need
> > > > > > > to
> > > > > > > > > wait for next request to send to broker, because of the
> > > > > > > > "max.request.size"
> > > > > > > > > (default 1MB) limitation. "max.request.size" can also be
> > > > considered
> > > > > > to
> > > > > > > > > increase to avoid this issue. What do you think?
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > >  maybe I can firstly decrease the "batch.max.size" to
> > 32KB
> > > > > > > > > >
> > > > > > > > > > I think 32KB is too small.  With 5 in-flight and 100ms
> > > latency
> > > > we
> > > > > > can
> > > > > > > > > > produce 1.6MB/s per partition.  With 256KB we can produce
> > > > > 12.8MB/s
> > > > > > > per
> > > > > > > > > > partition.  We should probably set up some testing and
> see
> > if
> > > > > 256KB
> > > > > > > has
> > > > > > > > > > problems.
> > > > > > > > > >
> > > > > > > > > > To illustrate latency dynamics, let's consider a
> simplified
> > > > > model:
> > > > > > 1
> > > > > > > > > > in-flight request per broker, produce latency 125ms,
> 256KB
> > > max
> > > > > > > request
> > > > > > > > > > size, 16 partitions assigned to the same broker, every
> > second
> > > > > 128KB
> > > > > > > is
> > > > > > > > > > produced to each partition (total production rate is
> > > 2MB/sec).
> > > > > > > > > >
> > > > > > > > > > If the batch size is 16KB, then the pattern would be the
> > > > > following:
> > > > > > > > > >
> > > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > > > > > > 125ms - complete first 16KB from each partition, send
> next
> > > 16KB
> > > > > > > > > > 250ms - complete second 16KB, send next 16KB
> > > > > > > > > > ...
> > > > > > > > > > 1000ms - complete 8th 16KB from each partition
> > > > > > > > > >
> > > > > > > > > > from this model it's easy to see that there are 256KB
> that
> > > are
> > > > > sent
> > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> > are
> > > > > sent
> > > > > > in
> > > > > > > > > > 875ms.
> > > > > > > > > >
> > > > > > > > > > If the batch size is 256KB, then the pattern would be the
> > > > > > following:
> > > > > > > > > >
> > > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > > 0ms - take 128KB each from first 2 partitions and send
> > (total
> > > > > > 256KB)
> > > > > > > > > > 125ms - complete 2 first partitions, send data from next
> 2
> > > > > > partitions
> > > > > > > > > > ...
> > > > > > > > > > 1000ms - complete last 2 partitions
> > > > > > > > > >
> > > > > > > > > > even though the pattern is different, there are still
> 256KB
> > > > that
> > > > > > are
> > > > > > > > sent
> > > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> > are
> > > > > sent
> > > > > > in
> > > > > > > > > > 875ms.
> > > > > > > > > >
> > > > > > > > > > Now, in this example if we do strictly round-robin
> (current
> > > > > > > > > implementation)
> > > > > > > > > > and we have this exact pattern (not sure how often such
> > > regular
> > > > > > > pattern
> > > > > > > > > > would happen in practice -- I would expect that it would
> > be a
> > > > bit
> > > > > > > more
> > > > > > > > > > random), some partitions would experience higher latency
> > than
> > > > > > others
> > > > > > > > (not
> > > > > > > > > > sure how much it would matter in practice -- in the end
> of
> > > the
> > > > > day
> > > > > > > some
> > > > > > > > > > bytes produced to a topic would have higher latency and
> > some
> > > > > bytes
> > > > > > > > would
> > > > > > > > > > have lower latency).  This pattern is easily fixed by
> > > choosing
> > > > > the
> > > > > > > next
> > > > > > > > > > partition randomly instead of using round-robin.
> > > > > > > > > >
> > > > > > > > > > -Artem
> > > > > > > > > >
> > > > > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <
> > > showuon@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Tom,
> > > > > > > > > > > Thanks for your comments. And thanks for Artem's
> > > explanation.
> > > > > > > > > > > Below is my response:
> > > > > > > > > > >
> > > > > > > > > > > > Currently because buffers are allocated using
> > batch.size
> > > it
> > > > > > means
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > handle records that are that large (e.g. one big record
> > per
> > > > > > batch).
> > > > > > > > > > Doesn't
> > > > > > > > > > > the introduction of smaller buffer sizes
> > > (batch.initial.size)
> > > > > > mean
> > > > > > > a
> > > > > > > > > > > corresponding decrease in the maximum record size that
> > the
> > > > > > producer
> > > > > > > > can
> > > > > > > > > > > handle?
> > > > > > > > > > >
> > > > > > > > > > > Actually, the "batch.size" is only like a threshold to
> > > decide
> > > > > if
> > > > > > > the
> > > > > > > > > > batch
> > > > > > > > > > > is "ready to be sent". That is, even if you set the
> > > > > > > "batch.size=16KB"
> > > > > > > > > > > (default value), users can still send one record sized
> > with
> > > > > 20KB,
> > > > > > > as
> > > > > > > > > long
> > > > > > > > > > > as the size is less than "max.request.size" in producer
> > > > > (default
> > > > > > > > 1MB).
> > > > > > > > > > > Therefore, the introduction of "batch.initial.size"
> won't
> > > > > > decrease
> > > > > > > > the
> > > > > > > > > > > maximum record size that the producer can handle.
> > > > > > > > > > >
> > > > > > > > > > > > But isn't there the risk that drainBatchesForOneNode
> > > would
> > > > > end
> > > > > > up
> > > > > > > > not
> > > > > > > > > > > sending ready
> > > > > > > > > > > batches well past when they ought to be sent (according
> > to
> > > > > their
> > > > > > > > > > linger.ms
> > > > > > > > > > > ),
> > > > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > > > aggressively?
> > > > > > > > > > >
> > > > > > > > > > > Did you mean that we have a "max.request.size" per
> > request
> > > > > > (default
> > > > > > > > is
> > > > > > > > > > > 1MB), and before this KIP, the request can include 64
> > > batches
> > > > > in
> > > > > > > > single
> > > > > > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we
> > might
> > > be
> > > > > > able
> > > > > > > to
> > > > > > > > > > > include 32 batches or less, because we aggressively
> sent
> > > more
> > > > > > > records
> > > > > > > > > in
> > > > > > > > > > > one batch, is that what you meant? That's a really good
> > > point
> > > > > > that
> > > > > > > > I've
> > > > > > > > > > > never thought about. I think your suggestion to go
> > through
> > > > > other
> > > > > > > > > > partitions
> > > > > > > > > > > that just fit "batch.size", or expire "linger.ms"
> first,
> > > > > before
> > > > > > > > > handling
> > > > > > > > > > > the one that is > "batch.size" limit is not a good way,
> > > > because
> > > > > > it
> > > > > > > > > might
> > > > > > > > > > > cause the one with size > "batch.size" always in the
> > lowest
> > > > > > > priority,
> > > > > > > > > and
> > > > > > > > > > > cause starving issue that the batch won't have chance
> to
> > > get
> > > > > > sent.
> > > > > > > > > > >
> > > > > > > > > > > I don't have better solution for it, but maybe I can
> > > firstly
> > > > > > > decrease
> > > > > > > > > the
> > > > > > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB
> > in
> > > > the
> > > > > > KIP.
> > > > > > > > > That
> > > > > > > > > > > should alleviate the problem. And still improve the
> > > > throughput.
> > > > > > > What
> > > > > > > > do
> > > > > > > > > > you
> > > > > > > > > > > think?
> > > > > > > > > > >
> > > > > > > > > > > Thank you.
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > > I think this KIP would change the behaviour of
> > > producers
> > > > > when
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > > > multiple partitions ready to be sent
> > > > > > > > > > > >
> > > > > > > > > > > > This is correct, the pattern changes and becomes more
> > > > > > > > coarse-grained.
> > > > > > > > > > > But
> > > > > > > > > > > > I don't think it changes fairness over the long
> run.  I
> > > > think
> > > > > > > it's
> > > > > > > > a
> > > > > > > > > > good
> > > > > > > > > > > > idea to change drainIndex to be random rather than
> > round
> > > > > robin
> > > > > > to
> > > > > > > > > avoid
> > > > > > > > > > > > forming patterns where some partitions would
> > consistently
> > > > get
> > > > > > > > higher
> > > > > > > > > > > > latencies than others because they wait longer for
> > their
> > > > > turn.
> > > > > > > > > > > >
> > > > > > > > > > > > If we really wanted to preserve the exact patterns,
> we
> > > > could
> > > > > > > either
> > > > > > > > > try
> > > > > > > > > > > to
> > > > > > > > > > > > support multiple 16KB batches from one partition per
> > > > request
> > > > > > > > > (probably
> > > > > > > > > > > > would require protocol change to change logic on the
> > > broker
> > > > > for
> > > > > > > > > > duplicate
> > > > > > > > > > > > detection) or try to re-batch 16KB batches from
> > > accumulator
> > > > > > into
> > > > > > > > > larger
> > > > > > > > > > > > batches during send (additional computations) or try
> to
> > > > > > consider
> > > > > > > > all
> > > > > > > > > > > > partitions assigned to a broker to check if a new
> batch
> > > > needs
> > > > > > to
> > > > > > > be
> > > > > > > > > > > created
> > > > > > > > > > > > (i.e. compare cumulative batch size from all
> partitions
> > > > > > assigned
> > > > > > > > to a
> > > > > > > > > > > > broker and create new batch when cumulative size is
> > 1MB,
> > > > more
> > > > > > > > > complex).
> > > > > > > > > > > >
> > > > > > > > > > > > Overall, it seems like just increasing the max batch
> > size
> > > > is
> > > > > a
> > > > > > > > > simpler
> > > > > > > > > > > > solution and it does favor larger batch sizes, which
> is
> > > > > > > beneficial
> > > > > > > > > not
> > > > > > > > > > > just
> > > > > > > > > > > > for production.
> > > > > > > > > > > >
> > > > > > > > > > > > > ready batches well past when they ought to be sent
> > > > > (according
> > > > > > > to
> > > > > > > > > > their
> > > > > > > > > > > > linger.ms)
> > > > > > > > > > > >
> > > > > > > > > > > > The trigger for marking batches ready to be sent
> isn't
> > > > > changed
> > > > > > -
> > > > > > > a
> > > > > > > > > > batch
> > > > > > > > > > > is
> > > > > > > > > > > > ready to be sent once it reaches 16KB, so by the time
> > > > larger
> > > > > > > > batches
> > > > > > > > > > > start
> > > > > > > > > > > > forming, linger.ms wouldn't matter much because the
> > > > batching
> > > > > > > goal
> > > > > > > > is
> > > > > > > > > > met
> > > > > > > > > > > > and the batch can be sent immediately.  Larger
> batches
> > > > start
> > > > > > > > forming
> > > > > > > > > > once
> > > > > > > > > > > > the client starts waiting for the server, in which
> case
> > > > some
> > > > > > data
> > > > > > > > > will
> > > > > > > > > > > wait
> > > > > > > > > > > > its turn to be sent.  This will happen for some data
> > > > > regardless
> > > > > > > of
> > > > > > > > > how
> > > > > > > > > > we
> > > > > > > > > > > > pick data to send, the question is just whether we'd
> > have
> > > > > some
> > > > > > > > > > scenarios
> > > > > > > > > > > > where some partitions would consistently experience
> > > higher
> > > > > > > latency
> > > > > > > > > than
> > > > > > > > > > > > others.  I think picking drainIndex randomly would
> > > prevent
> > > > > such
> > > > > > > > > > > scenarios.
> > > > > > > > > > > >
> > > > > > > > > > > > -Artem
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> > > > > > tbentley@redhat.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Luke,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Currently because buffers are allocated using
> > > batch.size
> > > > it
> > > > > > > means
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > > handle records that are that large (e.g. one big
> > record
> > > > per
> > > > > > > > batch).
> > > > > > > > > > > > Doesn't
> > > > > > > > > > > > > the introduction of smaller buffer sizes
> > > > > (batch.initial.size)
> > > > > > > > mean
> > > > > > > > > a
> > > > > > > > > > > > > corresponding decrease in the maximum record size
> > that
> > > > the
> > > > > > > > producer
> > > > > > > > > > can
> > > > > > > > > > > > > handle? That might not be a problem if the user
> knows
> > > > their
> > > > > > > > maximum
> > > > > > > > > > > > record
> > > > > > > > > > > > > size and has tuned batch.initial.size accordingly,
> > but
> > > if
> > > > > the
> > > > > > > > > default
> > > > > > > > > > > for
> > > > > > > > > > > > > batch.initial.size < batch.size it could cause
> > > > regressions
> > > > > > for
> > > > > > > > > > existing
> > > > > > > > > > > > > users with a large record size, I think. It should
> be
> > > > > enough
> > > > > > > for
> > > > > > > > > > > > > batch.initial.size to default to batch.size,
> allowing
> > > > users
> > > > > > who
> > > > > > > > > care
> > > > > > > > > > > > about
> > > > > > > > > > > > > the memory saving in the off-peak throughput case
> to
> > do
> > > > the
> > > > > > > > tuning,
> > > > > > > > > > but
> > > > > > > > > > > > not
> > > > > > > > > > > > > causing a regression for existing users.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think this KIP would change the behaviour of
> > > producers
> > > > > when
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > > > > multiple partitions ready to be sent: By sending
> all
> > > the
> > > > > > ready
> > > > > > > > > > buffers
> > > > > > > > > > > > > (which may now be > batch.size) for the first
> > > partition,
> > > > we
> > > > > > > could
> > > > > > > > > end
> > > > > > > > > > > up
> > > > > > > > > > > > > excluding ready buffers for other partitions from
> the
> > > > > current
> > > > > > > > send.
> > > > > > > > > > In
> > > > > > > > > > > > > other words, as I understand the KIP currently,
> > > there's a
> > > > > > > change
> > > > > > > > in
> > > > > > > > > > > > > fairness. I think the code in
> > > > > > > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > > > > > > will
> > > > > > > > > > > > > ensure fairness in the long run, because the
> > drainIndex
> > > > > will
> > > > > > > > ensure
> > > > > > > > > > > that
> > > > > > > > > > > > > those other partitions each get their turn at being
> > the
> > > > > > first.
> > > > > > > > But
> > > > > > > > > > > isn't
> > > > > > > > > > > > > there the risk that drainBatchesForOneNode would
> end
> > up
> > > > not
> > > > > > > > sending
> > > > > > > > > > > ready
> > > > > > > > > > > > > batches well past when they ought to be sent
> > (according
> > > > to
> > > > > > > their
> > > > > > > > > > > > linger.ms
> > > > > > > > > > > > > ),
> > > > > > > > > > > > > because it's sending buffers for earlier partitions
> > too
> > > > > > > > > aggressively?
> > > > > > > > > > > Or,
> > > > > > > > > > > > > to put it another way, perhaps the
> RecordAccumulator
> > > > should
> > > > > > > > > > round-robin
> > > > > > > > > > > > the
> > > > > > > > > > > > > ready buffers for _all_ the partitions before
> trying
> > to
> > > > > fill
> > > > > > > the
> > > > > > > > > > > > remaining
> > > > > > > > > > > > > space with the extra buffers (beyond the batch.size
> > > > limit)
> > > > > > for
> > > > > > > > the
> > > > > > > > > > > first
> > > > > > > > > > > > > partitions?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Kind regards,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Tom
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <
> > > > > showuon@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > > > > > > If no, I'm going to update the KIP based on my
> > > previous
> > > > > > mail,
> > > > > > > > and
> > > > > > > > > > > > start a
> > > > > > > > > > > > > > vote tomorrow or next week.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> > > > > > showuon@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We
> > can
> > > > > keep a
> > > > > > > > list
> > > > > > > > > of
> > > > > > > > > > > > > buffers
> > > > > > > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > > > > > > -> Do you mean we allocate multiple buffers
> with
> > > > > > > > > > > > "buffer.initial.size",
> > > > > > > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > > > > > > ex:
> > > > > > > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > > > > > > | 4KB |
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > b. when new records reached and the remaining
> > > buffer
> > > > is
> > > > > > not
> > > > > > > > > > enough
> > > > > > > > > > > > for
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > records, we create another batch with
> > > > > > "batch.initial.size"
> > > > > > > > > buffer
> > > > > > > > > > > > > > > ex: we already have 3KB of data in the 1st
> > buffer,
> > > > and
> > > > > > here
> > > > > > > > > comes
> > > > > > > > > > > the
> > > > > > > > > > > > > 2KB
> > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create
> > new
> > > > > > buffer,
> > > > > > > > and
> > > > > > > > > > > > linked
> > > > > > > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Is that what you mean?
> > > > > > > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. I think we should also consider tweaking the
> > > > > semantics
> > > > > > > of
> > > > > > > > > > > > batch.size
> > > > > > > > > > > > > > so
> > > > > > > > > > > > > > > that the sent batches can be larger if the
> batch
> > is
> > > > not
> > > > > > > ready
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > > > sent
> > > > > > > > > > > > > > > (while still respecting max.request.size and
> > > perhaps
> > > > a
> > > > > > new
> > > > > > > > > > > > > > max.batch.size).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > --> In the KIP, I was trying to make the
> > > "batch.size"
> > > > > as
> > > > > > > the
> > > > > > > > > > upper
> > > > > > > > > > > > > bound
> > > > > > > > > > > > > > > of the batch size, and introduce a
> > > > "batch.initial.size"
> > > > > > as
> > > > > > > > > > initial
> > > > > > > > > > > > > batch
> > > > > > > > > > > > > > > size.
> > > > > > > > > > > > > > > So are you saying that we can let "batch.size"
> as
> > > > > initial
> > > > > > > > batch
> > > > > > > > > > > size
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > introduce a "max.batch.size" as upper bound
> > value?
> > > > > > > > > > > > > > > That's a good suggestion, but that would change
> > the
> > > > > > > semantics
> > > > > > > > > of
> > > > > > > > > > > > > > > "batch.size", which might surprise some users.
> I
> > > > think
> > > > > my
> > > > > > > > > > original
> > > > > > > > > > > > > > proposal
> > > > > > > > > > > > > > > ("batch.initial.size") is safer for users. What
> > do
> > > > you
> > > > > > > think?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > > > > > > ismael@juma.me.uk
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> I think we should also consider tweaking the
> > > > semantics
> > > > > > of
> > > > > > > > > > > batch.size
> > > > > > > > > > > > > so
> > > > > > > > > > > > > > >> that the sent batches can be larger if the
> batch
> > > is
> > > > > not
> > > > > > > > ready
> > > > > > > > > to
> > > > > > > > > > > be
> > > > > > > > > > > > > sent
> > > > > > > > > > > > > > >> (while still respecting max.request.size and
> > > > perhaps a
> > > > > > new
> > > > > > > > > > > > > > >> max.batch.size).
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Ismael
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > > > > > > ismael@juma.me.uk
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Hi Luke,
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Thanks for the KIP. Why do we have to
> > reallocate
> > > > the
> > > > > > > > buffer?
> > > > > > > > > > We
> > > > > > > > > > > > can
> > > > > > > > > > > > > > >> keep a
> > > > > > > > > > > > > > >> > list of buffers instead and avoid
> > reallocation.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Ismael
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > > > > > > showuon@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > > > > > > >> >> I'd like to start the discussion for the
> > > > proposal:
> > > > > > > > KIP-782:
> > > > > > > > > > > > > > Expandable
> > > > > > > > > > > > > > >> >> batch size in producer.
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> The main purpose for this KIP is to have
> > better
> > > > > > memory
> > > > > > > > > usage
> > > > > > > > > > in
> > > > > > > > > > > > > > >> producer,
> > > > > > > > > > > > > > >> >> and also save users from the dilemma while
> > > > setting
> > > > > > the
> > > > > > > > > batch
> > > > > > > > > > > size
> > > > > > > > > > > > > > >> >> configuration. After this KIP, users can
> set
> > a
> > > > > higher
> > > > > > > > > > > batch.size
> > > > > > > > > > > > > > >> without
> > > > > > > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > > > > > > "batch.initial.size"
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >> Thank you.
> > > > > > > > > > > > > > >> >> Luke
> > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Lucas Bradstreet <lu...@confluent.io.INVALID>.
Hi Jun,

One difference compared to increasing the default batch size is that users
may actually prefer smaller batches but it makes much less sense to
accumulate many small batches if a batch is already sending.

For example, imagine a user that prefer 16K batches with 5ms linger.
Everything is functioning normally and 16KB batches are being sent. Then
there's a 500ms blip for that broker. Do we want to continue to accumulate
16KB batches, each of which requires a round trip, or would we prefer to
accumulate larger batches while sending is blocked?

I'm not hugely against increasing the default batch.size in general, but
batch.max.size does seem to have some nice properties.

Thanks,

Lucas

On Fri, Dec 10, 2021 at 9:42 AM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Artem, Luke,
>
> Thanks for the reply.
>
> 11. If we get rid of batch.max.size and increase the default batch.size,
> it's true the behavior is slightly different than before. However, does
> that difference matter to most of our users? In your example, if a user
> sets linger.ms to 100ms and thinks 256KB is good for throughput, does it
> matter to deliver any batch smaller than 256KB before 100ms? I also find it
> a bit hard to explain to our users these 3 different settings related to
> batch size.
>
> Thanks,
>
> Jun
>
> On Thu, Dec 9, 2021 at 5:47 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > 11. In addition to Artem's comment, I think the reason to have additional
> > "batch.max.size" is to have more flexibility to users.
> > For example:
> > With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data coming
> to
> > a partition within 50ms. Now, sender is ready to pick up the batch to
> send.
> > In current design, we send 16KB data to broker, and keep the remaining
> 4KB
> > in the producer, to keep accumulating data.
> > But after this KIP, user can send the whole 20KB of data together. That
> is,
> > user can decide if they want to accumulate more data before the sender is
> > ready, and send them together, to have higher throughput. The
> > "batch.size=16KB" in the proposal, is more like a soft limit, (and
> > "batch.max.size" is like a hard limit), or it's like a switch to enable
> the
> > batch to become ready. Before sender is ready, we can still accumulate
> more
> > data, and wrap them together to send to broker.
> >
> > User can increase "batch.size" to 20KB to achieve the same goal in the
> > current design, of course. But you can imagine, if the data within 100ms
> is
> > just 18KB, then the batch of data will wait for 100ms passed to be sent
> > out. This "batch.max.size" config will allow more flexible for user
> config.
> >
> > Does that make sense?
> >
> > Thank you.
> > Luke
> >
> > On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits
> > <al...@confluent.io.invalid> wrote:
> >
> > > Hi Jun,
> > >
> > > 11. That was my initial thinking as well, but in a discussion some
> people
> > > pointed out the change of behavior in some scenarios.  E.g. if someone
> > for
> > > some reason really wants batches to be at least 16KB and sets large
> > > linger.ms, and most of the time the batches are filled quickly enough
> > and
> > > they observe a certain latency.  Then they upgrade their client with a
> > > default size 256KB and the latency increases.  This could be seen as a
> > > regression.  It could be fixed by just reducing linger.ms to specify
> the
> > > expected latency, but still could be seen as a disruption by some
> users.
> > > The other reason to have 2 sizes is to avoid allocating large buffers
> > > upfront.
> > >
> > > -Artem
> > >
> > > On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <ju...@confluent.io.invalid>
> wrote:
> > >
> > > > Hi, Artem,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 11. Got it. To me, batch.size is really used for throughput and not
> for
> > > > latency guarantees. There is no guarantee when 16KB will be
> > accumulated.
> > > > So, if users want any latency guarantee, they will need to specify
> > > > linger.ms accordingly.
> > > > Then, batch.size can just be used to tune for throughput.
> > > >
> > > > 20. Could we also describe the unit of compression? Is
> > > > it batch.initial.size, batch.size or batch.max.size?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> > > > <al...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > 10. My understanding is that MemoryRecords would under the covers
> be
> > > > > allocated in chunks, so logically it still would be one
> MemoryRecords
> > > > > object, it's just instead of allocating one large chunk upfront,
> > > smaller
> > > > > chunks are allocated as needed to grow the batch and linked into a
> > > list.
> > > > >
> > > > > 11. The reason for 2 sizes is to avoid change of behavior when
> > > triggering
> > > > > batch send with large linger.ms.  Currently, a batch send is
> > triggered
> > > > > once
> > > > > the batch reaches 16KB by default, if we just raise the default to
> > > 256KB,
> > > > > then the batch send will be delayed.  Using a separate value would
> > > allow
> > > > > keeping the current behavior when sending the batch out, but
> provide
> > > > better
> > > > > throughput with high latency + high bandwidth channels.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <ju...@confluent.io.invalid>
> > > wrote:
> > > > >
> > > > > > Hi, Luke,
> > > > > >
> > > > > > Thanks for the KIP.  A few comments below.
> > > > > >
> > > > > > 10. Accumulating small batches could improve memory usage. Will
> > that
> > > > > > introduce extra copying when generating a produce request?
> > > Currently, a
> > > > > > produce request takes a single MemoryRecords per partition.
> > > > > > 11. Do we need to introduce a new config batch.max.size? Could we
> > > just
> > > > > > increase the default of batch.size? We probably need to have
> > KIP-794
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > >
> > > > > > resolved
> > > > > > before increasing the default batch size since the larger the
> batch
> > > > size,
> > > > > > the worse the problem in KIP-794.
> > > > > > 12. As for max.request.size, currently it's used for both the max
> > > > record
> > > > > > size and the max request size, which is unintuitive. Perhaps we
> > could
> > > > > > introduce a new config max.record.size that defaults to 1MB. We
> > could
> > > > > then
> > > > > > increase max.request.size to sth like 10MB.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > > > > > <al...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Luke,
> > > > > > >
> > > > > > > I don't mind increasing the max.request.size to a higher
> number,
> > > e.g.
> > > > > 2MB
> > > > > > > could be good.  I think we should also run some benchmarks to
> see
> > > the
> > > > > > > effects of different sizes.
> > > > > > >
> > > > > > > I agree that changing round robin to random solves an
> independent
> > > > > > existing
> > > > > > > issue, however the logic in this KIP exacerbates the issue, so
> > > there
> > > > is
> > > > > > > some dependency.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <sh...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Artem,
> > > > > > > > Yes, I agree if we go with random selection instead of
> > > round-robin
> > > > > > > > selection, the latency issue will be more fair. That is, if
> > there
> > > > are
> > > > > > 10
> > > > > > > > partitions, the 10th partition will always be the last choice
> > in
> > > > each
> > > > > > > round
> > > > > > > > in current design, but with random selection, the chance to
> be
> > > > > selected
> > > > > > > is
> > > > > > > > more fair.
> > > > > > > >
> > > > > > > > However, I think that's kind of out of scope with this KIP.
> > This
> > > is
> > > > > an
> > > > > > > > existing issue, and it might need further discussion to
> decide
> > if
> > > > > this
> > > > > > > > change is necessary.
> > > > > > > >
> > > > > > > > I agree the default 32KB for "batch.max.size" might be not
> huge
> > > > > > > improvement
> > > > > > > > compared with 256KB. I'm thinking, maybe default to "64KB"
> for
> > > > > > > > "batch.max.size", and make the documentation clear that if
> the
> > > > > > > > "batch.max.size"
> > > > > > > > is increased, there might be chances that the "ready"
> > partitions
> > > > need
> > > > > > to
> > > > > > > > wait for next request to send to broker, because of the
> > > > > > > "max.request.size"
> > > > > > > > (default 1MB) limitation. "max.request.size" can also be
> > > considered
> > > > > to
> > > > > > > > increase to avoid this issue. What do you think?
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > >  maybe I can firstly decrease the "batch.max.size" to
> 32KB
> > > > > > > > >
> > > > > > > > > I think 32KB is too small.  With 5 in-flight and 100ms
> > latency
> > > we
> > > > > can
> > > > > > > > > produce 1.6MB/s per partition.  With 256KB we can produce
> > > > 12.8MB/s
> > > > > > per
> > > > > > > > > partition.  We should probably set up some testing and see
> if
> > > > 256KB
> > > > > > has
> > > > > > > > > problems.
> > > > > > > > >
> > > > > > > > > To illustrate latency dynamics, let's consider a simplified
> > > > model:
> > > > > 1
> > > > > > > > > in-flight request per broker, produce latency 125ms, 256KB
> > max
> > > > > > request
> > > > > > > > > size, 16 partitions assigned to the same broker, every
> second
> > > > 128KB
> > > > > > is
> > > > > > > > > produced to each partition (total production rate is
> > 2MB/sec).
> > > > > > > > >
> > > > > > > > > If the batch size is 16KB, then the pattern would be the
> > > > following:
> > > > > > > > >
> > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > > > > > 125ms - complete first 16KB from each partition, send next
> > 16KB
> > > > > > > > > 250ms - complete second 16KB, send next 16KB
> > > > > > > > > ...
> > > > > > > > > 1000ms - complete 8th 16KB from each partition
> > > > > > > > >
> > > > > > > > > from this model it's easy to see that there are 256KB that
> > are
> > > > sent
> > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> are
> > > > sent
> > > > > in
> > > > > > > > > 875ms.
> > > > > > > > >
> > > > > > > > > If the batch size is 256KB, then the pattern would be the
> > > > > following:
> > > > > > > > >
> > > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > > 0ms - take 128KB each from first 2 partitions and send
> (total
> > > > > 256KB)
> > > > > > > > > 125ms - complete 2 first partitions, send data from next 2
> > > > > partitions
> > > > > > > > > ...
> > > > > > > > > 1000ms - complete last 2 partitions
> > > > > > > > >
> > > > > > > > > even though the pattern is different, there are still 256KB
> > > that
> > > > > are
> > > > > > > sent
> > > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that
> are
> > > > sent
> > > > > in
> > > > > > > > > 875ms.
> > > > > > > > >
> > > > > > > > > Now, in this example if we do strictly round-robin (current
> > > > > > > > implementation)
> > > > > > > > > and we have this exact pattern (not sure how often such
> > regular
> > > > > > pattern
> > > > > > > > > would happen in practice -- I would expect that it would
> be a
> > > bit
> > > > > > more
> > > > > > > > > random), some partitions would experience higher latency
> than
> > > > > others
> > > > > > > (not
> > > > > > > > > sure how much it would matter in practice -- in the end of
> > the
> > > > day
> > > > > > some
> > > > > > > > > bytes produced to a topic would have higher latency and
> some
> > > > bytes
> > > > > > > would
> > > > > > > > > have lower latency).  This pattern is easily fixed by
> > choosing
> > > > the
> > > > > > next
> > > > > > > > > partition randomly instead of using round-robin.
> > > > > > > > >
> > > > > > > > > -Artem
> > > > > > > > >
> > > > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <
> > showuon@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Tom,
> > > > > > > > > > Thanks for your comments. And thanks for Artem's
> > explanation.
> > > > > > > > > > Below is my response:
> > > > > > > > > >
> > > > > > > > > > > Currently because buffers are allocated using
> batch.size
> > it
> > > > > means
> > > > > > > we
> > > > > > > > > can
> > > > > > > > > > handle records that are that large (e.g. one big record
> per
> > > > > batch).
> > > > > > > > > Doesn't
> > > > > > > > > > the introduction of smaller buffer sizes
> > (batch.initial.size)
> > > > > mean
> > > > > > a
> > > > > > > > > > corresponding decrease in the maximum record size that
> the
> > > > > producer
> > > > > > > can
> > > > > > > > > > handle?
> > > > > > > > > >
> > > > > > > > > > Actually, the "batch.size" is only like a threshold to
> > decide
> > > > if
> > > > > > the
> > > > > > > > > batch
> > > > > > > > > > is "ready to be sent". That is, even if you set the
> > > > > > "batch.size=16KB"
> > > > > > > > > > (default value), users can still send one record sized
> with
> > > > 20KB,
> > > > > > as
> > > > > > > > long
> > > > > > > > > > as the size is less than "max.request.size" in producer
> > > > (default
> > > > > > > 1MB).
> > > > > > > > > > Therefore, the introduction of "batch.initial.size" won't
> > > > > decrease
> > > > > > > the
> > > > > > > > > > maximum record size that the producer can handle.
> > > > > > > > > >
> > > > > > > > > > > But isn't there the risk that drainBatchesForOneNode
> > would
> > > > end
> > > > > up
> > > > > > > not
> > > > > > > > > > sending ready
> > > > > > > > > > batches well past when they ought to be sent (according
> to
> > > > their
> > > > > > > > > linger.ms
> > > > > > > > > > ),
> > > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > > aggressively?
> > > > > > > > > >
> > > > > > > > > > Did you mean that we have a "max.request.size" per
> request
> > > > > (default
> > > > > > > is
> > > > > > > > > > 1MB), and before this KIP, the request can include 64
> > batches
> > > > in
> > > > > > > single
> > > > > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we
> might
> > be
> > > > > able
> > > > > > to
> > > > > > > > > > include 32 batches or less, because we aggressively sent
> > more
> > > > > > records
> > > > > > > > in
> > > > > > > > > > one batch, is that what you meant? That's a really good
> > point
> > > > > that
> > > > > > > I've
> > > > > > > > > > never thought about. I think your suggestion to go
> through
> > > > other
> > > > > > > > > partitions
> > > > > > > > > > that just fit "batch.size", or expire "linger.ms" first,
> > > > before
> > > > > > > > handling
> > > > > > > > > > the one that is > "batch.size" limit is not a good way,
> > > because
> > > > > it
> > > > > > > > might
> > > > > > > > > > cause the one with size > "batch.size" always in the
> lowest
> > > > > > priority,
> > > > > > > > and
> > > > > > > > > > cause starving issue that the batch won't have chance to
> > get
> > > > > sent.
> > > > > > > > > >
> > > > > > > > > > I don't have better solution for it, but maybe I can
> > firstly
> > > > > > decrease
> > > > > > > > the
> > > > > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB
> in
> > > the
> > > > > KIP.
> > > > > > > > That
> > > > > > > > > > should alleviate the problem. And still improve the
> > > throughput.
> > > > > > What
> > > > > > > do
> > > > > > > > > you
> > > > > > > > > > think?
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > > > I think this KIP would change the behaviour of
> > producers
> > > > when
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > > > multiple partitions ready to be sent
> > > > > > > > > > >
> > > > > > > > > > > This is correct, the pattern changes and becomes more
> > > > > > > coarse-grained.
> > > > > > > > > > But
> > > > > > > > > > > I don't think it changes fairness over the long run.  I
> > > think
> > > > > > it's
> > > > > > > a
> > > > > > > > > good
> > > > > > > > > > > idea to change drainIndex to be random rather than
> round
> > > > robin
> > > > > to
> > > > > > > > avoid
> > > > > > > > > > > forming patterns where some partitions would
> consistently
> > > get
> > > > > > > higher
> > > > > > > > > > > latencies than others because they wait longer for
> their
> > > > turn.
> > > > > > > > > > >
> > > > > > > > > > > If we really wanted to preserve the exact patterns, we
> > > could
> > > > > > either
> > > > > > > > try
> > > > > > > > > > to
> > > > > > > > > > > support multiple 16KB batches from one partition per
> > > request
> > > > > > > > (probably
> > > > > > > > > > > would require protocol change to change logic on the
> > broker
> > > > for
> > > > > > > > > duplicate
> > > > > > > > > > > detection) or try to re-batch 16KB batches from
> > accumulator
> > > > > into
> > > > > > > > larger
> > > > > > > > > > > batches during send (additional computations) or try to
> > > > > consider
> > > > > > > all
> > > > > > > > > > > partitions assigned to a broker to check if a new batch
> > > needs
> > > > > to
> > > > > > be
> > > > > > > > > > created
> > > > > > > > > > > (i.e. compare cumulative batch size from all partitions
> > > > > assigned
> > > > > > > to a
> > > > > > > > > > > broker and create new batch when cumulative size is
> 1MB,
> > > more
> > > > > > > > complex).
> > > > > > > > > > >
> > > > > > > > > > > Overall, it seems like just increasing the max batch
> size
> > > is
> > > > a
> > > > > > > > simpler
> > > > > > > > > > > solution and it does favor larger batch sizes, which is
> > > > > > beneficial
> > > > > > > > not
> > > > > > > > > > just
> > > > > > > > > > > for production.
> > > > > > > > > > >
> > > > > > > > > > > > ready batches well past when they ought to be sent
> > > > (according
> > > > > > to
> > > > > > > > > their
> > > > > > > > > > > linger.ms)
> > > > > > > > > > >
> > > > > > > > > > > The trigger for marking batches ready to be sent isn't
> > > > changed
> > > > > -
> > > > > > a
> > > > > > > > > batch
> > > > > > > > > > is
> > > > > > > > > > > ready to be sent once it reaches 16KB, so by the time
> > > larger
> > > > > > > batches
> > > > > > > > > > start
> > > > > > > > > > > forming, linger.ms wouldn't matter much because the
> > > batching
> > > > > > goal
> > > > > > > is
> > > > > > > > > met
> > > > > > > > > > > and the batch can be sent immediately.  Larger batches
> > > start
> > > > > > > forming
> > > > > > > > > once
> > > > > > > > > > > the client starts waiting for the server, in which case
> > > some
> > > > > data
> > > > > > > > will
> > > > > > > > > > wait
> > > > > > > > > > > its turn to be sent.  This will happen for some data
> > > > regardless
> > > > > > of
> > > > > > > > how
> > > > > > > > > we
> > > > > > > > > > > pick data to send, the question is just whether we'd
> have
> > > > some
> > > > > > > > > scenarios
> > > > > > > > > > > where some partitions would consistently experience
> > higher
> > > > > > latency
> > > > > > > > than
> > > > > > > > > > > others.  I think picking drainIndex randomly would
> > prevent
> > > > such
> > > > > > > > > > scenarios.
> > > > > > > > > > >
> > > > > > > > > > > -Artem
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> > > > > tbentley@redhat.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Luke,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > >
> > > > > > > > > > > > Currently because buffers are allocated using
> > batch.size
> > > it
> > > > > > means
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > > handle records that are that large (e.g. one big
> record
> > > per
> > > > > > > batch).
> > > > > > > > > > > Doesn't
> > > > > > > > > > > > the introduction of smaller buffer sizes
> > > > (batch.initial.size)
> > > > > > > mean
> > > > > > > > a
> > > > > > > > > > > > corresponding decrease in the maximum record size
> that
> > > the
> > > > > > > producer
> > > > > > > > > can
> > > > > > > > > > > > handle? That might not be a problem if the user knows
> > > their
> > > > > > > maximum
> > > > > > > > > > > record
> > > > > > > > > > > > size and has tuned batch.initial.size accordingly,
> but
> > if
> > > > the
> > > > > > > > default
> > > > > > > > > > for
> > > > > > > > > > > > batch.initial.size < batch.size it could cause
> > > regressions
> > > > > for
> > > > > > > > > existing
> > > > > > > > > > > > users with a large record size, I think. It should be
> > > > enough
> > > > > > for
> > > > > > > > > > > > batch.initial.size to default to batch.size, allowing
> > > users
> > > > > who
> > > > > > > > care
> > > > > > > > > > > about
> > > > > > > > > > > > the memory saving in the off-peak throughput case to
> do
> > > the
> > > > > > > tuning,
> > > > > > > > > but
> > > > > > > > > > > not
> > > > > > > > > > > > causing a regression for existing users.
> > > > > > > > > > > >
> > > > > > > > > > > > I think this KIP would change the behaviour of
> > producers
> > > > when
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > > > > multiple partitions ready to be sent: By sending all
> > the
> > > > > ready
> > > > > > > > > buffers
> > > > > > > > > > > > (which may now be > batch.size) for the first
> > partition,
> > > we
> > > > > > could
> > > > > > > > end
> > > > > > > > > > up
> > > > > > > > > > > > excluding ready buffers for other partitions from the
> > > > current
> > > > > > > send.
> > > > > > > > > In
> > > > > > > > > > > > other words, as I understand the KIP currently,
> > there's a
> > > > > > change
> > > > > > > in
> > > > > > > > > > > > fairness. I think the code in
> > > > > > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > > > > > will
> > > > > > > > > > > > ensure fairness in the long run, because the
> drainIndex
> > > > will
> > > > > > > ensure
> > > > > > > > > > that
> > > > > > > > > > > > those other partitions each get their turn at being
> the
> > > > > first.
> > > > > > > But
> > > > > > > > > > isn't
> > > > > > > > > > > > there the risk that drainBatchesForOneNode would end
> up
> > > not
> > > > > > > sending
> > > > > > > > > > ready
> > > > > > > > > > > > batches well past when they ought to be sent
> (according
> > > to
> > > > > > their
> > > > > > > > > > > linger.ms
> > > > > > > > > > > > ),
> > > > > > > > > > > > because it's sending buffers for earlier partitions
> too
> > > > > > > > aggressively?
> > > > > > > > > > Or,
> > > > > > > > > > > > to put it another way, perhaps the RecordAccumulator
> > > should
> > > > > > > > > round-robin
> > > > > > > > > > > the
> > > > > > > > > > > > ready buffers for _all_ the partitions before trying
> to
> > > > fill
> > > > > > the
> > > > > > > > > > > remaining
> > > > > > > > > > > > space with the extra buffers (beyond the batch.size
> > > limit)
> > > > > for
> > > > > > > the
> > > > > > > > > > first
> > > > > > > > > > > > partitions?
> > > > > > > > > > > >
> > > > > > > > > > > > Kind regards,
> > > > > > > > > > > >
> > > > > > > > > > > > Tom
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <
> > > > showuon@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > > > > > If no, I'm going to update the KIP based on my
> > previous
> > > > > mail,
> > > > > > > and
> > > > > > > > > > > start a
> > > > > > > > > > > > > vote tomorrow or next week.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > Luke
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> > > > > showuon@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We
> can
> > > > keep a
> > > > > > > list
> > > > > > > > of
> > > > > > > > > > > > buffers
> > > > > > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > > > > > > > "buffer.initial.size",
> > > > > > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > > > > > ex:
> > > > > > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > > > > > | 4KB |
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > b. when new records reached and the remaining
> > buffer
> > > is
> > > > > not
> > > > > > > > > enough
> > > > > > > > > > > for
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > records, we create another batch with
> > > > > "batch.initial.size"
> > > > > > > > buffer
> > > > > > > > > > > > > > ex: we already have 3KB of data in the 1st
> buffer,
> > > and
> > > > > here
> > > > > > > > comes
> > > > > > > > > > the
> > > > > > > > > > > > 2KB
> > > > > > > > > > > > > > record
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create
> new
> > > > > buffer,
> > > > > > > and
> > > > > > > > > > > linked
> > > > > > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Is that what you mean?
> > > > > > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. I think we should also consider tweaking the
> > > > semantics
> > > > > > of
> > > > > > > > > > > batch.size
> > > > > > > > > > > > > so
> > > > > > > > > > > > > > that the sent batches can be larger if the batch
> is
> > > not
> > > > > > ready
> > > > > > > > to
> > > > > > > > > be
> > > > > > > > > > > > sent
> > > > > > > > > > > > > > (while still respecting max.request.size and
> > perhaps
> > > a
> > > > > new
> > > > > > > > > > > > > max.batch.size).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > --> In the KIP, I was trying to make the
> > "batch.size"
> > > > as
> > > > > > the
> > > > > > > > > upper
> > > > > > > > > > > > bound
> > > > > > > > > > > > > > of the batch size, and introduce a
> > > "batch.initial.size"
> > > > > as
> > > > > > > > > initial
> > > > > > > > > > > > batch
> > > > > > > > > > > > > > size.
> > > > > > > > > > > > > > So are you saying that we can let "batch.size" as
> > > > initial
> > > > > > > batch
> > > > > > > > > > size
> > > > > > > > > > > > and
> > > > > > > > > > > > > > introduce a "max.batch.size" as upper bound
> value?
> > > > > > > > > > > > > > That's a good suggestion, but that would change
> the
> > > > > > semantics
> > > > > > > > of
> > > > > > > > > > > > > > "batch.size", which might surprise some users. I
> > > think
> > > > my
> > > > > > > > > original
> > > > > > > > > > > > > proposal
> > > > > > > > > > > > > > ("batch.initial.size") is safer for users. What
> do
> > > you
> > > > > > think?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > > > > > ismael@juma.me.uk
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> I think we should also consider tweaking the
> > > semantics
> > > > > of
> > > > > > > > > > batch.size
> > > > > > > > > > > > so
> > > > > > > > > > > > > >> that the sent batches can be larger if the batch
> > is
> > > > not
> > > > > > > ready
> > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > > sent
> > > > > > > > > > > > > >> (while still respecting max.request.size and
> > > perhaps a
> > > > > new
> > > > > > > > > > > > > >> max.batch.size).
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Ismael
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > > > > > ismael@juma.me.uk
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Hi Luke,
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Thanks for the KIP. Why do we have to
> reallocate
> > > the
> > > > > > > buffer?
> > > > > > > > > We
> > > > > > > > > > > can
> > > > > > > > > > > > > >> keep a
> > > > > > > > > > > > > >> > list of buffers instead and avoid
> reallocation.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Ismael
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > > > > > showuon@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > > > > > >> >> I'd like to start the discussion for the
> > > proposal:
> > > > > > > KIP-782:
> > > > > > > > > > > > > Expandable
> > > > > > > > > > > > > >> >> batch size in producer.
> > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > >> >> The main purpose for this KIP is to have
> better
> > > > > memory
> > > > > > > > usage
> > > > > > > > > in
> > > > > > > > > > > > > >> producer,
> > > > > > > > > > > > > >> >> and also save users from the dilemma while
> > > setting
> > > > > the
> > > > > > > > batch
> > > > > > > > > > size
> > > > > > > > > > > > > >> >> configuration. After this KIP, users can set
> a
> > > > higher
> > > > > > > > > > batch.size
> > > > > > > > > > > > > >> without
> > > > > > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > > > > > "batch.initial.size"
> > > > > > > > > > > > and
> > > > > > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > >> >> Thank you.
> > > > > > > > > > > > > >> >> Luke
> > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Artem, Luke,

Thanks for the reply.

11. If we get rid of batch.max.size and increase the default batch.size,
it's true the behavior is slightly different than before. However, does
that difference matter to most of our users? In your example, if a user
sets linger.ms to 100ms and thinks 256KB is good for throughput, does it
matter to deliver any batch smaller than 256KB before 100ms? I also find it
a bit hard to explain to our users these 3 different settings related to
batch size.

Thanks,

Jun

On Thu, Dec 9, 2021 at 5:47 AM Luke Chen <sh...@gmail.com> wrote:

> Hi Jun,
>
> 11. In addition to Artem's comment, I think the reason to have additional
> "batch.max.size" is to have more flexibility to users.
> For example:
> With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data coming to
> a partition within 50ms. Now, sender is ready to pick up the batch to send.
> In current design, we send 16KB data to broker, and keep the remaining 4KB
> in the producer, to keep accumulating data.
> But after this KIP, user can send the whole 20KB of data together. That is,
> user can decide if they want to accumulate more data before the sender is
> ready, and send them together, to have higher throughput. The
> "batch.size=16KB" in the proposal, is more like a soft limit, (and
> "batch.max.size" is like a hard limit), or it's like a switch to enable the
> batch to become ready. Before sender is ready, we can still accumulate more
> data, and wrap them together to send to broker.
>
> User can increase "batch.size" to 20KB to achieve the same goal in the
> current design, of course. But you can imagine, if the data within 100ms is
> just 18KB, then the batch of data will wait for 100ms passed to be sent
> out. This "batch.max.size" config will allow more flexible for user config.
>
> Does that make sense?
>
> Thank you.
> Luke
>
> On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits
> <al...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > 11. That was my initial thinking as well, but in a discussion some people
> > pointed out the change of behavior in some scenarios.  E.g. if someone
> for
> > some reason really wants batches to be at least 16KB and sets large
> > linger.ms, and most of the time the batches are filled quickly enough
> and
> > they observe a certain latency.  Then they upgrade their client with a
> > default size 256KB and the latency increases.  This could be seen as a
> > regression.  It could be fixed by just reducing linger.ms to specify the
> > expected latency, but still could be seen as a disruption by some users.
> > The other reason to have 2 sizes is to avoid allocating large buffers
> > upfront.
> >
> > -Artem
> >
> > On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > 11. Got it. To me, batch.size is really used for throughput and not for
> > > latency guarantees. There is no guarantee when 16KB will be
> accumulated.
> > > So, if users want any latency guarantee, they will need to specify
> > > linger.ms accordingly.
> > > Then, batch.size can just be used to tune for throughput.
> > >
> > > 20. Could we also describe the unit of compression? Is
> > > it batch.initial.size, batch.size or batch.max.size?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> > > <al...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 10. My understanding is that MemoryRecords would under the covers be
> > > > allocated in chunks, so logically it still would be one MemoryRecords
> > > > object, it's just instead of allocating one large chunk upfront,
> > smaller
> > > > chunks are allocated as needed to grow the batch and linked into a
> > list.
> > > >
> > > > 11. The reason for 2 sizes is to avoid change of behavior when
> > triggering
> > > > batch send with large linger.ms.  Currently, a batch send is
> triggered
> > > > once
> > > > the batch reaches 16KB by default, if we just raise the default to
> > 256KB,
> > > > then the batch send will be delayed.  Using a separate value would
> > allow
> > > > keeping the current behavior when sending the batch out, but provide
> > > better
> > > > throughput with high latency + high bandwidth channels.
> > > >
> > > > -Artem
> > > >
> > > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <ju...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi, Luke,
> > > > >
> > > > > Thanks for the KIP.  A few comments below.
> > > > >
> > > > > 10. Accumulating small batches could improve memory usage. Will
> that
> > > > > introduce extra copying when generating a produce request?
> > Currently, a
> > > > > produce request takes a single MemoryRecords per partition.
> > > > > 11. Do we need to introduce a new config batch.max.size? Could we
> > just
> > > > > increase the default of batch.size? We probably need to have
> KIP-794
> > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > >
> > > > > resolved
> > > > > before increasing the default batch size since the larger the batch
> > > size,
> > > > > the worse the problem in KIP-794.
> > > > > 12. As for max.request.size, currently it's used for both the max
> > > record
> > > > > size and the max request size, which is unintuitive. Perhaps we
> could
> > > > > introduce a new config max.record.size that defaults to 1MB. We
> could
> > > > then
> > > > > increase max.request.size to sth like 10MB.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > > > > <al...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Hi Luke,
> > > > > >
> > > > > > I don't mind increasing the max.request.size to a higher number,
> > e.g.
> > > > 2MB
> > > > > > could be good.  I think we should also run some benchmarks to see
> > the
> > > > > > effects of different sizes.
> > > > > >
> > > > > > I agree that changing round robin to random solves an independent
> > > > > existing
> > > > > > issue, however the logic in this KIP exacerbates the issue, so
> > there
> > > is
> > > > > > some dependency.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <sh...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Artem,
> > > > > > > Yes, I agree if we go with random selection instead of
> > round-robin
> > > > > > > selection, the latency issue will be more fair. That is, if
> there
> > > are
> > > > > 10
> > > > > > > partitions, the 10th partition will always be the last choice
> in
> > > each
> > > > > > round
> > > > > > > in current design, but with random selection, the chance to be
> > > > selected
> > > > > > is
> > > > > > > more fair.
> > > > > > >
> > > > > > > However, I think that's kind of out of scope with this KIP.
> This
> > is
> > > > an
> > > > > > > existing issue, and it might need further discussion to decide
> if
> > > > this
> > > > > > > change is necessary.
> > > > > > >
> > > > > > > I agree the default 32KB for "batch.max.size" might be not huge
> > > > > > improvement
> > > > > > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > > > > > "batch.max.size", and make the documentation clear that if the
> > > > > > > "batch.max.size"
> > > > > > > is increased, there might be chances that the "ready"
> partitions
> > > need
> > > > > to
> > > > > > > wait for next request to send to broker, because of the
> > > > > > "max.request.size"
> > > > > > > (default 1MB) limitation. "max.request.size" can also be
> > considered
> > > > to
> > > > > > > increase to avoid this issue. What do you think?
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > > > > > > >
> > > > > > > > I think 32KB is too small.  With 5 in-flight and 100ms
> latency
> > we
> > > > can
> > > > > > > > produce 1.6MB/s per partition.  With 256KB we can produce
> > > 12.8MB/s
> > > > > per
> > > > > > > > partition.  We should probably set up some testing and see if
> > > 256KB
> > > > > has
> > > > > > > > problems.
> > > > > > > >
> > > > > > > > To illustrate latency dynamics, let's consider a simplified
> > > model:
> > > > 1
> > > > > > > > in-flight request per broker, produce latency 125ms, 256KB
> max
> > > > > request
> > > > > > > > size, 16 partitions assigned to the same broker, every second
> > > 128KB
> > > > > is
> > > > > > > > produced to each partition (total production rate is
> 2MB/sec).
> > > > > > > >
> > > > > > > > If the batch size is 16KB, then the pattern would be the
> > > following:
> > > > > > > >
> > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > > > > 125ms - complete first 16KB from each partition, send next
> 16KB
> > > > > > > > 250ms - complete second 16KB, send next 16KB
> > > > > > > > ...
> > > > > > > > 1000ms - complete 8th 16KB from each partition
> > > > > > > >
> > > > > > > > from this model it's easy to see that there are 256KB that
> are
> > > sent
> > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are
> > > sent
> > > > in
> > > > > > > > 875ms.
> > > > > > > >
> > > > > > > > If the batch size is 256KB, then the pattern would be the
> > > > following:
> > > > > > > >
> > > > > > > > 0ms - produce 128KB into each partition
> > > > > > > > 0ms - take 128KB each from first 2 partitions and send (total
> > > > 256KB)
> > > > > > > > 125ms - complete 2 first partitions, send data from next 2
> > > > partitions
> > > > > > > > ...
> > > > > > > > 1000ms - complete last 2 partitions
> > > > > > > >
> > > > > > > > even though the pattern is different, there are still 256KB
> > that
> > > > are
> > > > > > sent
> > > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are
> > > sent
> > > > in
> > > > > > > > 875ms.
> > > > > > > >
> > > > > > > > Now, in this example if we do strictly round-robin (current
> > > > > > > implementation)
> > > > > > > > and we have this exact pattern (not sure how often such
> regular
> > > > > pattern
> > > > > > > > would happen in practice -- I would expect that it would be a
> > bit
> > > > > more
> > > > > > > > random), some partitions would experience higher latency than
> > > > others
> > > > > > (not
> > > > > > > > sure how much it would matter in practice -- in the end of
> the
> > > day
> > > > > some
> > > > > > > > bytes produced to a topic would have higher latency and some
> > > bytes
> > > > > > would
> > > > > > > > have lower latency).  This pattern is easily fixed by
> choosing
> > > the
> > > > > next
> > > > > > > > partition randomly instead of using round-robin.
> > > > > > > >
> > > > > > > > -Artem
> > > > > > > >
> > > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <
> showuon@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Tom,
> > > > > > > > > Thanks for your comments. And thanks for Artem's
> explanation.
> > > > > > > > > Below is my response:
> > > > > > > > >
> > > > > > > > > > Currently because buffers are allocated using batch.size
> it
> > > > means
> > > > > > we
> > > > > > > > can
> > > > > > > > > handle records that are that large (e.g. one big record per
> > > > batch).
> > > > > > > > Doesn't
> > > > > > > > > the introduction of smaller buffer sizes
> (batch.initial.size)
> > > > mean
> > > > > a
> > > > > > > > > corresponding decrease in the maximum record size that the
> > > > producer
> > > > > > can
> > > > > > > > > handle?
> > > > > > > > >
> > > > > > > > > Actually, the "batch.size" is only like a threshold to
> decide
> > > if
> > > > > the
> > > > > > > > batch
> > > > > > > > > is "ready to be sent". That is, even if you set the
> > > > > "batch.size=16KB"
> > > > > > > > > (default value), users can still send one record sized with
> > > 20KB,
> > > > > as
> > > > > > > long
> > > > > > > > > as the size is less than "max.request.size" in producer
> > > (default
> > > > > > 1MB).
> > > > > > > > > Therefore, the introduction of "batch.initial.size" won't
> > > > decrease
> > > > > > the
> > > > > > > > > maximum record size that the producer can handle.
> > > > > > > > >
> > > > > > > > > > But isn't there the risk that drainBatchesForOneNode
> would
> > > end
> > > > up
> > > > > > not
> > > > > > > > > sending ready
> > > > > > > > > batches well past when they ought to be sent (according to
> > > their
> > > > > > > > linger.ms
> > > > > > > > > ),
> > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > aggressively?
> > > > > > > > >
> > > > > > > > > Did you mean that we have a "max.request.size" per request
> > > > (default
> > > > > > is
> > > > > > > > > 1MB), and before this KIP, the request can include 64
> batches
> > > in
> > > > > > single
> > > > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might
> be
> > > > able
> > > > > to
> > > > > > > > > include 32 batches or less, because we aggressively sent
> more
> > > > > records
> > > > > > > in
> > > > > > > > > one batch, is that what you meant? That's a really good
> point
> > > > that
> > > > > > I've
> > > > > > > > > never thought about. I think your suggestion to go through
> > > other
> > > > > > > > partitions
> > > > > > > > > that just fit "batch.size", or expire "linger.ms" first,
> > > before
> > > > > > > handling
> > > > > > > > > the one that is > "batch.size" limit is not a good way,
> > because
> > > > it
> > > > > > > might
> > > > > > > > > cause the one with size > "batch.size" always in the lowest
> > > > > priority,
> > > > > > > and
> > > > > > > > > cause starving issue that the batch won't have chance to
> get
> > > > sent.
> > > > > > > > >
> > > > > > > > > I don't have better solution for it, but maybe I can
> firstly
> > > > > decrease
> > > > > > > the
> > > > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB in
> > the
> > > > KIP.
> > > > > > > That
> > > > > > > > > should alleviate the problem. And still improve the
> > throughput.
> > > > > What
> > > > > > do
> > > > > > > > you
> > > > > > > > > think?
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > > I think this KIP would change the behaviour of
> producers
> > > when
> > > > > > there
> > > > > > > > are
> > > > > > > > > > multiple partitions ready to be sent
> > > > > > > > > >
> > > > > > > > > > This is correct, the pattern changes and becomes more
> > > > > > coarse-grained.
> > > > > > > > > But
> > > > > > > > > > I don't think it changes fairness over the long run.  I
> > think
> > > > > it's
> > > > > > a
> > > > > > > > good
> > > > > > > > > > idea to change drainIndex to be random rather than round
> > > robin
> > > > to
> > > > > > > avoid
> > > > > > > > > > forming patterns where some partitions would consistently
> > get
> > > > > > higher
> > > > > > > > > > latencies than others because they wait longer for their
> > > turn.
> > > > > > > > > >
> > > > > > > > > > If we really wanted to preserve the exact patterns, we
> > could
> > > > > either
> > > > > > > try
> > > > > > > > > to
> > > > > > > > > > support multiple 16KB batches from one partition per
> > request
> > > > > > > (probably
> > > > > > > > > > would require protocol change to change logic on the
> broker
> > > for
> > > > > > > > duplicate
> > > > > > > > > > detection) or try to re-batch 16KB batches from
> accumulator
> > > > into
> > > > > > > larger
> > > > > > > > > > batches during send (additional computations) or try to
> > > > consider
> > > > > > all
> > > > > > > > > > partitions assigned to a broker to check if a new batch
> > needs
> > > > to
> > > > > be
> > > > > > > > > created
> > > > > > > > > > (i.e. compare cumulative batch size from all partitions
> > > > assigned
> > > > > > to a
> > > > > > > > > > broker and create new batch when cumulative size is 1MB,
> > more
> > > > > > > complex).
> > > > > > > > > >
> > > > > > > > > > Overall, it seems like just increasing the max batch size
> > is
> > > a
> > > > > > > simpler
> > > > > > > > > > solution and it does favor larger batch sizes, which is
> > > > > beneficial
> > > > > > > not
> > > > > > > > > just
> > > > > > > > > > for production.
> > > > > > > > > >
> > > > > > > > > > > ready batches well past when they ought to be sent
> > > (according
> > > > > to
> > > > > > > > their
> > > > > > > > > > linger.ms)
> > > > > > > > > >
> > > > > > > > > > The trigger for marking batches ready to be sent isn't
> > > changed
> > > > -
> > > > > a
> > > > > > > > batch
> > > > > > > > > is
> > > > > > > > > > ready to be sent once it reaches 16KB, so by the time
> > larger
> > > > > > batches
> > > > > > > > > start
> > > > > > > > > > forming, linger.ms wouldn't matter much because the
> > batching
> > > > > goal
> > > > > > is
> > > > > > > > met
> > > > > > > > > > and the batch can be sent immediately.  Larger batches
> > start
> > > > > > forming
> > > > > > > > once
> > > > > > > > > > the client starts waiting for the server, in which case
> > some
> > > > data
> > > > > > > will
> > > > > > > > > wait
> > > > > > > > > > its turn to be sent.  This will happen for some data
> > > regardless
> > > > > of
> > > > > > > how
> > > > > > > > we
> > > > > > > > > > pick data to send, the question is just whether we'd have
> > > some
> > > > > > > > scenarios
> > > > > > > > > > where some partitions would consistently experience
> higher
> > > > > latency
> > > > > > > than
> > > > > > > > > > others.  I think picking drainIndex randomly would
> prevent
> > > such
> > > > > > > > > scenarios.
> > > > > > > > > >
> > > > > > > > > > -Artem
> > > > > > > > > >
> > > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> > > > tbentley@redhat.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Luke,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > >
> > > > > > > > > > > Currently because buffers are allocated using
> batch.size
> > it
> > > > > means
> > > > > > > we
> > > > > > > > > can
> > > > > > > > > > > handle records that are that large (e.g. one big record
> > per
> > > > > > batch).
> > > > > > > > > > Doesn't
> > > > > > > > > > > the introduction of smaller buffer sizes
> > > (batch.initial.size)
> > > > > > mean
> > > > > > > a
> > > > > > > > > > > corresponding decrease in the maximum record size that
> > the
> > > > > > producer
> > > > > > > > can
> > > > > > > > > > > handle? That might not be a problem if the user knows
> > their
> > > > > > maximum
> > > > > > > > > > record
> > > > > > > > > > > size and has tuned batch.initial.size accordingly, but
> if
> > > the
> > > > > > > default
> > > > > > > > > for
> > > > > > > > > > > batch.initial.size < batch.size it could cause
> > regressions
> > > > for
> > > > > > > > existing
> > > > > > > > > > > users with a large record size, I think. It should be
> > > enough
> > > > > for
> > > > > > > > > > > batch.initial.size to default to batch.size, allowing
> > users
> > > > who
> > > > > > > care
> > > > > > > > > > about
> > > > > > > > > > > the memory saving in the off-peak throughput case to do
> > the
> > > > > > tuning,
> > > > > > > > but
> > > > > > > > > > not
> > > > > > > > > > > causing a regression for existing users.
> > > > > > > > > > >
> > > > > > > > > > > I think this KIP would change the behaviour of
> producers
> > > when
> > > > > > there
> > > > > > > > are
> > > > > > > > > > > multiple partitions ready to be sent: By sending all
> the
> > > > ready
> > > > > > > > buffers
> > > > > > > > > > > (which may now be > batch.size) for the first
> partition,
> > we
> > > > > could
> > > > > > > end
> > > > > > > > > up
> > > > > > > > > > > excluding ready buffers for other partitions from the
> > > current
> > > > > > send.
> > > > > > > > In
> > > > > > > > > > > other words, as I understand the KIP currently,
> there's a
> > > > > change
> > > > > > in
> > > > > > > > > > > fairness. I think the code in
> > > > > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > > > > will
> > > > > > > > > > > ensure fairness in the long run, because the drainIndex
> > > will
> > > > > > ensure
> > > > > > > > > that
> > > > > > > > > > > those other partitions each get their turn at being the
> > > > first.
> > > > > > But
> > > > > > > > > isn't
> > > > > > > > > > > there the risk that drainBatchesForOneNode would end up
> > not
> > > > > > sending
> > > > > > > > > ready
> > > > > > > > > > > batches well past when they ought to be sent (according
> > to
> > > > > their
> > > > > > > > > > linger.ms
> > > > > > > > > > > ),
> > > > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > > > aggressively?
> > > > > > > > > Or,
> > > > > > > > > > > to put it another way, perhaps the RecordAccumulator
> > should
> > > > > > > > round-robin
> > > > > > > > > > the
> > > > > > > > > > > ready buffers for _all_ the partitions before trying to
> > > fill
> > > > > the
> > > > > > > > > > remaining
> > > > > > > > > > > space with the extra buffers (beyond the batch.size
> > limit)
> > > > for
> > > > > > the
> > > > > > > > > first
> > > > > > > > > > > partitions?
> > > > > > > > > > >
> > > > > > > > > > > Kind regards,
> > > > > > > > > > >
> > > > > > > > > > > Tom
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <
> > > showuon@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > > > > If no, I'm going to update the KIP based on my
> previous
> > > > mail,
> > > > > > and
> > > > > > > > > > start a
> > > > > > > > > > > > vote tomorrow or next week.
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you.
> > > > > > > > > > > > Luke
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> > > > showuon@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We can
> > > keep a
> > > > > > list
> > > > > > > of
> > > > > > > > > > > buffers
> > > > > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > > > > > > "buffer.initial.size",
> > > > > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > > > > ex:
> > > > > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > > > > | 4KB |
> > > > > > > > > > > > >
> > > > > > > > > > > > > b. when new records reached and the remaining
> buffer
> > is
> > > > not
> > > > > > > > enough
> > > > > > > > > > for
> > > > > > > > > > > > the
> > > > > > > > > > > > > records, we create another batch with
> > > > "batch.initial.size"
> > > > > > > buffer
> > > > > > > > > > > > > ex: we already have 3KB of data in the 1st buffer,
> > and
> > > > here
> > > > > > > comes
> > > > > > > > > the
> > > > > > > > > > > 2KB
> > > > > > > > > > > > > record
> > > > > > > > > > > > >
> > > > > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create new
> > > > buffer,
> > > > > > and
> > > > > > > > > > linked
> > > > > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > > > > >
> > > > > > > > > > > > > Is that what you mean?
> > > > > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2. I think we should also consider tweaking the
> > > semantics
> > > > > of
> > > > > > > > > > batch.size
> > > > > > > > > > > > so
> > > > > > > > > > > > > that the sent batches can be larger if the batch is
> > not
> > > > > ready
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > > sent
> > > > > > > > > > > > > (while still respecting max.request.size and
> perhaps
> > a
> > > > new
> > > > > > > > > > > > max.batch.size).
> > > > > > > > > > > > >
> > > > > > > > > > > > > --> In the KIP, I was trying to make the
> "batch.size"
> > > as
> > > > > the
> > > > > > > > upper
> > > > > > > > > > > bound
> > > > > > > > > > > > > of the batch size, and introduce a
> > "batch.initial.size"
> > > > as
> > > > > > > > initial
> > > > > > > > > > > batch
> > > > > > > > > > > > > size.
> > > > > > > > > > > > > So are you saying that we can let "batch.size" as
> > > initial
> > > > > > batch
> > > > > > > > > size
> > > > > > > > > > > and
> > > > > > > > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > > > > > > > That's a good suggestion, but that would change the
> > > > > semantics
> > > > > > > of
> > > > > > > > > > > > > "batch.size", which might surprise some users. I
> > think
> > > my
> > > > > > > > original
> > > > > > > > > > > > proposal
> > > > > > > > > > > > > ("batch.initial.size") is safer for users. What do
> > you
> > > > > think?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > Luke
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > > > > ismael@juma.me.uk
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> I think we should also consider tweaking the
> > semantics
> > > > of
> > > > > > > > > batch.size
> > > > > > > > > > > so
> > > > > > > > > > > > >> that the sent batches can be larger if the batch
> is
> > > not
> > > > > > ready
> > > > > > > to
> > > > > > > > > be
> > > > > > > > > > > sent
> > > > > > > > > > > > >> (while still respecting max.request.size and
> > perhaps a
> > > > new
> > > > > > > > > > > > >> max.batch.size).
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Ismael
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > > > > ismael@juma.me.uk
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > Hi Luke,
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Thanks for the KIP. Why do we have to reallocate
> > the
> > > > > > buffer?
> > > > > > > > We
> > > > > > > > > > can
> > > > > > > > > > > > >> keep a
> > > > > > > > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Ismael
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > > > > showuon@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > > > > >> >> I'd like to start the discussion for the
> > proposal:
> > > > > > KIP-782:
> > > > > > > > > > > > Expandable
> > > > > > > > > > > > >> >> batch size in producer.
> > > > > > > > > > > > >> >>
> > > > > > > > > > > > >> >> The main purpose for this KIP is to have better
> > > > memory
> > > > > > > usage
> > > > > > > > in
> > > > > > > > > > > > >> producer,
> > > > > > > > > > > > >> >> and also save users from the dilemma while
> > setting
> > > > the
> > > > > > > batch
> > > > > > > > > size
> > > > > > > > > > > > >> >> configuration. After this KIP, users can set a
> > > higher
> > > > > > > > > batch.size
> > > > > > > > > > > > >> without
> > > > > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > > > > "batch.initial.size"
> > > > > > > > > > > and
> > > > > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > > > > >> >>
> > > > > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > > > > >> >>
> > > > > > > > > > > > >> >>
> > > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > > > > >> >>
> > > > > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > > > > >> >>
> > > > > > > > > > > > >> >> Thank you.
> > > > > > > > > > > > >> >> Luke
> > > > > > > > > > > > >> >>
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Luke Chen <sh...@gmail.com>.
Hi Jun,

11. In addition to Artem's comment, I think the reason to have additional
"batch.max.size" is to have more flexibility to users.
For example:
With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data coming to
a partition within 50ms. Now, sender is ready to pick up the batch to send.
In current design, we send 16KB data to broker, and keep the remaining 4KB
in the producer, to keep accumulating data.
But after this KIP, user can send the whole 20KB of data together. That is,
user can decide if they want to accumulate more data before the sender is
ready, and send them together, to have higher throughput. The
"batch.size=16KB" in the proposal, is more like a soft limit, (and
"batch.max.size" is like a hard limit), or it's like a switch to enable the
batch to become ready. Before sender is ready, we can still accumulate more
data, and wrap them together to send to broker.

User can increase "batch.size" to 20KB to achieve the same goal in the
current design, of course. But you can imagine, if the data within 100ms is
just 18KB, then the batch of data will wait for 100ms passed to be sent
out. This "batch.max.size" config will allow more flexible for user config.

Does that make sense?

Thank you.
Luke

On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits
<al...@confluent.io.invalid> wrote:

> Hi Jun,
>
> 11. That was my initial thinking as well, but in a discussion some people
> pointed out the change of behavior in some scenarios.  E.g. if someone for
> some reason really wants batches to be at least 16KB and sets large
> linger.ms, and most of the time the batches are filled quickly enough and
> they observe a certain latency.  Then they upgrade their client with a
> default size 256KB and the latency increases.  This could be seen as a
> regression.  It could be fixed by just reducing linger.ms to specify the
> expected latency, but still could be seen as a disruption by some users.
> The other reason to have 2 sizes is to avoid allocating large buffers
> upfront.
>
> -Artem
>
> On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 11. Got it. To me, batch.size is really used for throughput and not for
> > latency guarantees. There is no guarantee when 16KB will be accumulated.
> > So, if users want any latency guarantee, they will need to specify
> > linger.ms accordingly.
> > Then, batch.size can just be used to tune for throughput.
> >
> > 20. Could we also describe the unit of compression? Is
> > it batch.initial.size, batch.size or batch.max.size?
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> > <al...@confluent.io.invalid> wrote:
> >
> > > Hi Jun,
> > >
> > > 10. My understanding is that MemoryRecords would under the covers be
> > > allocated in chunks, so logically it still would be one MemoryRecords
> > > object, it's just instead of allocating one large chunk upfront,
> smaller
> > > chunks are allocated as needed to grow the batch and linked into a
> list.
> > >
> > > 11. The reason for 2 sizes is to avoid change of behavior when
> triggering
> > > batch send with large linger.ms.  Currently, a batch send is triggered
> > > once
> > > the batch reaches 16KB by default, if we just raise the default to
> 256KB,
> > > then the batch send will be delayed.  Using a separate value would
> allow
> > > keeping the current behavior when sending the batch out, but provide
> > better
> > > throughput with high latency + high bandwidth channels.
> > >
> > > -Artem
> > >
> > > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <ju...@confluent.io.invalid>
> wrote:
> > >
> > > > Hi, Luke,
> > > >
> > > > Thanks for the KIP.  A few comments below.
> > > >
> > > > 10. Accumulating small batches could improve memory usage. Will that
> > > > introduce extra copying when generating a produce request?
> Currently, a
> > > > produce request takes a single MemoryRecords per partition.
> > > > 11. Do we need to introduce a new config batch.max.size? Could we
> just
> > > > increase the default of batch.size? We probably need to have KIP-794
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > >
> > > > resolved
> > > > before increasing the default batch size since the larger the batch
> > size,
> > > > the worse the problem in KIP-794.
> > > > 12. As for max.request.size, currently it's used for both the max
> > record
> > > > size and the max request size, which is unintuitive. Perhaps we could
> > > > introduce a new config max.record.size that defaults to 1MB. We could
> > > then
> > > > increase max.request.size to sth like 10MB.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > > > <al...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Luke,
> > > > >
> > > > > I don't mind increasing the max.request.size to a higher number,
> e.g.
> > > 2MB
> > > > > could be good.  I think we should also run some benchmarks to see
> the
> > > > > effects of different sizes.
> > > > >
> > > > > I agree that changing round robin to random solves an independent
> > > > existing
> > > > > issue, however the logic in this KIP exacerbates the issue, so
> there
> > is
> > > > > some dependency.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <sh...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Artem,
> > > > > > Yes, I agree if we go with random selection instead of
> round-robin
> > > > > > selection, the latency issue will be more fair. That is, if there
> > are
> > > > 10
> > > > > > partitions, the 10th partition will always be the last choice in
> > each
> > > > > round
> > > > > > in current design, but with random selection, the chance to be
> > > selected
> > > > > is
> > > > > > more fair.
> > > > > >
> > > > > > However, I think that's kind of out of scope with this KIP. This
> is
> > > an
> > > > > > existing issue, and it might need further discussion to decide if
> > > this
> > > > > > change is necessary.
> > > > > >
> > > > > > I agree the default 32KB for "batch.max.size" might be not huge
> > > > > improvement
> > > > > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > > > > "batch.max.size", and make the documentation clear that if the
> > > > > > "batch.max.size"
> > > > > > is increased, there might be chances that the "ready" partitions
> > need
> > > > to
> > > > > > wait for next request to send to broker, because of the
> > > > > "max.request.size"
> > > > > > (default 1MB) limitation. "max.request.size" can also be
> considered
> > > to
> > > > > > increase to avoid this issue. What do you think?
> > > > > >
> > > > > > Thank you.
> > > > > > Luke
> > > > > >
> > > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > > > <al...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > > > > > >
> > > > > > > I think 32KB is too small.  With 5 in-flight and 100ms latency
> we
> > > can
> > > > > > > produce 1.6MB/s per partition.  With 256KB we can produce
> > 12.8MB/s
> > > > per
> > > > > > > partition.  We should probably set up some testing and see if
> > 256KB
> > > > has
> > > > > > > problems.
> > > > > > >
> > > > > > > To illustrate latency dynamics, let's consider a simplified
> > model:
> > > 1
> > > > > > > in-flight request per broker, produce latency 125ms, 256KB max
> > > > request
> > > > > > > size, 16 partitions assigned to the same broker, every second
> > 128KB
> > > > is
> > > > > > > produced to each partition (total production rate is 2MB/sec).
> > > > > > >
> > > > > > > If the batch size is 16KB, then the pattern would be the
> > following:
> > > > > > >
> > > > > > > 0ms - produce 128KB into each partition
> > > > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > > > 125ms - complete first 16KB from each partition, send next 16KB
> > > > > > > 250ms - complete second 16KB, send next 16KB
> > > > > > > ...
> > > > > > > 1000ms - complete 8th 16KB from each partition
> > > > > > >
> > > > > > > from this model it's easy to see that there are 256KB that are
> > sent
> > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are
> > sent
> > > in
> > > > > > > 875ms.
> > > > > > >
> > > > > > > If the batch size is 256KB, then the pattern would be the
> > > following:
> > > > > > >
> > > > > > > 0ms - produce 128KB into each partition
> > > > > > > 0ms - take 128KB each from first 2 partitions and send (total
> > > 256KB)
> > > > > > > 125ms - complete 2 first partitions, send data from next 2
> > > partitions
> > > > > > > ...
> > > > > > > 1000ms - complete last 2 partitions
> > > > > > >
> > > > > > > even though the pattern is different, there are still 256KB
> that
> > > are
> > > > > sent
> > > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are
> > sent
> > > in
> > > > > > > 875ms.
> > > > > > >
> > > > > > > Now, in this example if we do strictly round-robin (current
> > > > > > implementation)
> > > > > > > and we have this exact pattern (not sure how often such regular
> > > > pattern
> > > > > > > would happen in practice -- I would expect that it would be a
> bit
> > > > more
> > > > > > > random), some partitions would experience higher latency than
> > > others
> > > > > (not
> > > > > > > sure how much it would matter in practice -- in the end of the
> > day
> > > > some
> > > > > > > bytes produced to a topic would have higher latency and some
> > bytes
> > > > > would
> > > > > > > have lower latency).  This pattern is easily fixed by choosing
> > the
> > > > next
> > > > > > > partition randomly instead of using round-robin.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <sh...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Tom,
> > > > > > > > Thanks for your comments. And thanks for Artem's explanation.
> > > > > > > > Below is my response:
> > > > > > > >
> > > > > > > > > Currently because buffers are allocated using batch.size it
> > > means
> > > > > we
> > > > > > > can
> > > > > > > > handle records that are that large (e.g. one big record per
> > > batch).
> > > > > > > Doesn't
> > > > > > > > the introduction of smaller buffer sizes (batch.initial.size)
> > > mean
> > > > a
> > > > > > > > corresponding decrease in the maximum record size that the
> > > producer
> > > > > can
> > > > > > > > handle?
> > > > > > > >
> > > > > > > > Actually, the "batch.size" is only like a threshold to decide
> > if
> > > > the
> > > > > > > batch
> > > > > > > > is "ready to be sent". That is, even if you set the
> > > > "batch.size=16KB"
> > > > > > > > (default value), users can still send one record sized with
> > 20KB,
> > > > as
> > > > > > long
> > > > > > > > as the size is less than "max.request.size" in producer
> > (default
> > > > > 1MB).
> > > > > > > > Therefore, the introduction of "batch.initial.size" won't
> > > decrease
> > > > > the
> > > > > > > > maximum record size that the producer can handle.
> > > > > > > >
> > > > > > > > > But isn't there the risk that drainBatchesForOneNode would
> > end
> > > up
> > > > > not
> > > > > > > > sending ready
> > > > > > > > batches well past when they ought to be sent (according to
> > their
> > > > > > > linger.ms
> > > > > > > > ),
> > > > > > > > because it's sending buffers for earlier partitions too
> > > > aggressively?
> > > > > > > >
> > > > > > > > Did you mean that we have a "max.request.size" per request
> > > (default
> > > > > is
> > > > > > > > 1MB), and before this KIP, the request can include 64 batches
> > in
> > > > > single
> > > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be
> > > able
> > > > to
> > > > > > > > include 32 batches or less, because we aggressively sent more
> > > > records
> > > > > > in
> > > > > > > > one batch, is that what you meant? That's a really good point
> > > that
> > > > > I've
> > > > > > > > never thought about. I think your suggestion to go through
> > other
> > > > > > > partitions
> > > > > > > > that just fit "batch.size", or expire "linger.ms" first,
> > before
> > > > > > handling
> > > > > > > > the one that is > "batch.size" limit is not a good way,
> because
> > > it
> > > > > > might
> > > > > > > > cause the one with size > "batch.size" always in the lowest
> > > > priority,
> > > > > > and
> > > > > > > > cause starving issue that the batch won't have chance to get
> > > sent.
> > > > > > > >
> > > > > > > > I don't have better solution for it, but maybe I can firstly
> > > > decrease
> > > > > > the
> > > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB in
> the
> > > KIP.
> > > > > > That
> > > > > > > > should alleviate the problem. And still improve the
> throughput.
> > > > What
> > > > > do
> > > > > > > you
> > > > > > > > think?
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > > I think this KIP would change the behaviour of producers
> > when
> > > > > there
> > > > > > > are
> > > > > > > > > multiple partitions ready to be sent
> > > > > > > > >
> > > > > > > > > This is correct, the pattern changes and becomes more
> > > > > coarse-grained.
> > > > > > > > But
> > > > > > > > > I don't think it changes fairness over the long run.  I
> think
> > > > it's
> > > > > a
> > > > > > > good
> > > > > > > > > idea to change drainIndex to be random rather than round
> > robin
> > > to
> > > > > > avoid
> > > > > > > > > forming patterns where some partitions would consistently
> get
> > > > > higher
> > > > > > > > > latencies than others because they wait longer for their
> > turn.
> > > > > > > > >
> > > > > > > > > If we really wanted to preserve the exact patterns, we
> could
> > > > either
> > > > > > try
> > > > > > > > to
> > > > > > > > > support multiple 16KB batches from one partition per
> request
> > > > > > (probably
> > > > > > > > > would require protocol change to change logic on the broker
> > for
> > > > > > > duplicate
> > > > > > > > > detection) or try to re-batch 16KB batches from accumulator
> > > into
> > > > > > larger
> > > > > > > > > batches during send (additional computations) or try to
> > > consider
> > > > > all
> > > > > > > > > partitions assigned to a broker to check if a new batch
> needs
> > > to
> > > > be
> > > > > > > > created
> > > > > > > > > (i.e. compare cumulative batch size from all partitions
> > > assigned
> > > > > to a
> > > > > > > > > broker and create new batch when cumulative size is 1MB,
> more
> > > > > > complex).
> > > > > > > > >
> > > > > > > > > Overall, it seems like just increasing the max batch size
> is
> > a
> > > > > > simpler
> > > > > > > > > solution and it does favor larger batch sizes, which is
> > > > beneficial
> > > > > > not
> > > > > > > > just
> > > > > > > > > for production.
> > > > > > > > >
> > > > > > > > > > ready batches well past when they ought to be sent
> > (according
> > > > to
> > > > > > > their
> > > > > > > > > linger.ms)
> > > > > > > > >
> > > > > > > > > The trigger for marking batches ready to be sent isn't
> > changed
> > > -
> > > > a
> > > > > > > batch
> > > > > > > > is
> > > > > > > > > ready to be sent once it reaches 16KB, so by the time
> larger
> > > > > batches
> > > > > > > > start
> > > > > > > > > forming, linger.ms wouldn't matter much because the
> batching
> > > > goal
> > > > > is
> > > > > > > met
> > > > > > > > > and the batch can be sent immediately.  Larger batches
> start
> > > > > forming
> > > > > > > once
> > > > > > > > > the client starts waiting for the server, in which case
> some
> > > data
> > > > > > will
> > > > > > > > wait
> > > > > > > > > its turn to be sent.  This will happen for some data
> > regardless
> > > > of
> > > > > > how
> > > > > > > we
> > > > > > > > > pick data to send, the question is just whether we'd have
> > some
> > > > > > > scenarios
> > > > > > > > > where some partitions would consistently experience higher
> > > > latency
> > > > > > than
> > > > > > > > > others.  I think picking drainIndex randomly would prevent
> > such
> > > > > > > > scenarios.
> > > > > > > > >
> > > > > > > > > -Artem
> > > > > > > > >
> > > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> > > tbentley@redhat.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Luke,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP!
> > > > > > > > > >
> > > > > > > > > > Currently because buffers are allocated using batch.size
> it
> > > > means
> > > > > > we
> > > > > > > > can
> > > > > > > > > > handle records that are that large (e.g. one big record
> per
> > > > > batch).
> > > > > > > > > Doesn't
> > > > > > > > > > the introduction of smaller buffer sizes
> > (batch.initial.size)
> > > > > mean
> > > > > > a
> > > > > > > > > > corresponding decrease in the maximum record size that
> the
> > > > > producer
> > > > > > > can
> > > > > > > > > > handle? That might not be a problem if the user knows
> their
> > > > > maximum
> > > > > > > > > record
> > > > > > > > > > size and has tuned batch.initial.size accordingly, but if
> > the
> > > > > > default
> > > > > > > > for
> > > > > > > > > > batch.initial.size < batch.size it could cause
> regressions
> > > for
> > > > > > > existing
> > > > > > > > > > users with a large record size, I think. It should be
> > enough
> > > > for
> > > > > > > > > > batch.initial.size to default to batch.size, allowing
> users
> > > who
> > > > > > care
> > > > > > > > > about
> > > > > > > > > > the memory saving in the off-peak throughput case to do
> the
> > > > > tuning,
> > > > > > > but
> > > > > > > > > not
> > > > > > > > > > causing a regression for existing users.
> > > > > > > > > >
> > > > > > > > > > I think this KIP would change the behaviour of producers
> > when
> > > > > there
> > > > > > > are
> > > > > > > > > > multiple partitions ready to be sent: By sending all the
> > > ready
> > > > > > > buffers
> > > > > > > > > > (which may now be > batch.size) for the first partition,
> we
> > > > could
> > > > > > end
> > > > > > > > up
> > > > > > > > > > excluding ready buffers for other partitions from the
> > current
> > > > > send.
> > > > > > > In
> > > > > > > > > > other words, as I understand the KIP currently, there's a
> > > > change
> > > > > in
> > > > > > > > > > fairness. I think the code in
> > > > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > > > will
> > > > > > > > > > ensure fairness in the long run, because the drainIndex
> > will
> > > > > ensure
> > > > > > > > that
> > > > > > > > > > those other partitions each get their turn at being the
> > > first.
> > > > > But
> > > > > > > > isn't
> > > > > > > > > > there the risk that drainBatchesForOneNode would end up
> not
> > > > > sending
> > > > > > > > ready
> > > > > > > > > > batches well past when they ought to be sent (according
> to
> > > > their
> > > > > > > > > linger.ms
> > > > > > > > > > ),
> > > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > > aggressively?
> > > > > > > > Or,
> > > > > > > > > > to put it another way, perhaps the RecordAccumulator
> should
> > > > > > > round-robin
> > > > > > > > > the
> > > > > > > > > > ready buffers for _all_ the partitions before trying to
> > fill
> > > > the
> > > > > > > > > remaining
> > > > > > > > > > space with the extra buffers (beyond the batch.size
> limit)
> > > for
> > > > > the
> > > > > > > > first
> > > > > > > > > > partitions?
> > > > > > > > > >
> > > > > > > > > > Kind regards,
> > > > > > > > > >
> > > > > > > > > > Tom
> > > > > > > > > >
> > > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <
> > showuon@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > > > If no, I'm going to update the KIP based on my previous
> > > mail,
> > > > > and
> > > > > > > > > start a
> > > > > > > > > > > vote tomorrow or next week.
> > > > > > > > > > >
> > > > > > > > > > > Thank you.
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> > > showuon@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > > >
> > > > > > > > > > > > 1. Why do we have to reallocate the buffer? We can
> > keep a
> > > > > list
> > > > > > of
> > > > > > > > > > buffers
> > > > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > > > > > "buffer.initial.size",
> > > > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > > > ex:
> > > > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > > > | 4KB |
> > > > > > > > > > > >
> > > > > > > > > > > > b. when new records reached and the remaining buffer
> is
> > > not
> > > > > > > enough
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > records, we create another batch with
> > > "batch.initial.size"
> > > > > > buffer
> > > > > > > > > > > > ex: we already have 3KB of data in the 1st buffer,
> and
> > > here
> > > > > > comes
> > > > > > > > the
> > > > > > > > > > 2KB
> > > > > > > > > > > > record
> > > > > > > > > > > >
> > > > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create new
> > > buffer,
> > > > > and
> > > > > > > > > linked
> > > > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > > > >
> > > > > > > > > > > > Is that what you mean?
> > > > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > > > Thank you.
> > > > > > > > > > > >
> > > > > > > > > > > > 2. I think we should also consider tweaking the
> > semantics
> > > > of
> > > > > > > > > batch.size
> > > > > > > > > > > so
> > > > > > > > > > > > that the sent batches can be larger if the batch is
> not
> > > > ready
> > > > > > to
> > > > > > > be
> > > > > > > > > > sent
> > > > > > > > > > > > (while still respecting max.request.size and perhaps
> a
> > > new
> > > > > > > > > > > max.batch.size).
> > > > > > > > > > > >
> > > > > > > > > > > > --> In the KIP, I was trying to make the "batch.size"
> > as
> > > > the
> > > > > > > upper
> > > > > > > > > > bound
> > > > > > > > > > > > of the batch size, and introduce a
> "batch.initial.size"
> > > as
> > > > > > > initial
> > > > > > > > > > batch
> > > > > > > > > > > > size.
> > > > > > > > > > > > So are you saying that we can let "batch.size" as
> > initial
> > > > > batch
> > > > > > > > size
> > > > > > > > > > and
> > > > > > > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > > > > > > That's a good suggestion, but that would change the
> > > > semantics
> > > > > > of
> > > > > > > > > > > > "batch.size", which might surprise some users. I
> think
> > my
> > > > > > > original
> > > > > > > > > > > proposal
> > > > > > > > > > > > ("batch.initial.size") is safer for users. What do
> you
> > > > think?
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you.
> > > > > > > > > > > > Luke
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > > > ismael@juma.me.uk
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> I think we should also consider tweaking the
> semantics
> > > of
> > > > > > > > batch.size
> > > > > > > > > > so
> > > > > > > > > > > >> that the sent batches can be larger if the batch is
> > not
> > > > > ready
> > > > > > to
> > > > > > > > be
> > > > > > > > > > sent
> > > > > > > > > > > >> (while still respecting max.request.size and
> perhaps a
> > > new
> > > > > > > > > > > >> max.batch.size).
> > > > > > > > > > > >>
> > > > > > > > > > > >> Ismael
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > > > ismael@juma.me.uk
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Hi Luke,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for the KIP. Why do we have to reallocate
> the
> > > > > buffer?
> > > > > > > We
> > > > > > > > > can
> > > > > > > > > > > >> keep a
> > > > > > > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Ismael
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > > > showuon@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > > > >> >> I'd like to start the discussion for the
> proposal:
> > > > > KIP-782:
> > > > > > > > > > > Expandable
> > > > > > > > > > > >> >> batch size in producer.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> The main purpose for this KIP is to have better
> > > memory
> > > > > > usage
> > > > > > > in
> > > > > > > > > > > >> producer,
> > > > > > > > > > > >> >> and also save users from the dilemma while
> setting
> > > the
> > > > > > batch
> > > > > > > > size
> > > > > > > > > > > >> >> configuration. After this KIP, users can set a
> > higher
> > > > > > > > batch.size
> > > > > > > > > > > >> without
> > > > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > > > "batch.initial.size"
> > > > > > > > > > and
> > > > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> Thank you.
> > > > > > > > > > > >> >> Luke
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Artem Livshits <al...@confluent.io.INVALID>.
Hi Jun,

11. That was my initial thinking as well, but in a discussion some people
pointed out the change of behavior in some scenarios.  E.g. if someone for
some reason really wants batches to be at least 16KB and sets large
linger.ms, and most of the time the batches are filled quickly enough and
they observe a certain latency.  Then they upgrade their client with a
default size 256KB and the latency increases.  This could be seen as a
regression.  It could be fixed by just reducing linger.ms to specify the
expected latency, but still could be seen as a disruption by some users.
The other reason to have 2 sizes is to avoid allocating large buffers
upfront.

-Artem

On Wed, Dec 8, 2021 at 3:07 PM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 11. Got it. To me, batch.size is really used for throughput and not for
> latency guarantees. There is no guarantee when 16KB will be accumulated.
> So, if users want any latency guarantee, they will need to specify
> linger.ms accordingly.
> Then, batch.size can just be used to tune for throughput.
>
> 20. Could we also describe the unit of compression? Is
> it batch.initial.size, batch.size or batch.max.size?
>
> Thanks,
>
> Jun
>
> On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> <al...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > 10. My understanding is that MemoryRecords would under the covers be
> > allocated in chunks, so logically it still would be one MemoryRecords
> > object, it's just instead of allocating one large chunk upfront, smaller
> > chunks are allocated as needed to grow the batch and linked into a list.
> >
> > 11. The reason for 2 sizes is to avoid change of behavior when triggering
> > batch send with large linger.ms.  Currently, a batch send is triggered
> > once
> > the batch reaches 16KB by default, if we just raise the default to 256KB,
> > then the batch send will be delayed.  Using a separate value would allow
> > keeping the current behavior when sending the batch out, but provide
> better
> > throughput with high latency + high bandwidth channels.
> >
> > -Artem
> >
> > On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <ju...@confluent.io.invalid> wrote:
> >
> > > Hi, Luke,
> > >
> > > Thanks for the KIP.  A few comments below.
> > >
> > > 10. Accumulating small batches could improve memory usage. Will that
> > > introduce extra copying when generating a produce request? Currently, a
> > > produce request takes a single MemoryRecords per partition.
> > > 11. Do we need to introduce a new config batch.max.size? Could we just
> > > increase the default of batch.size? We probably need to have KIP-794
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > >
> > > resolved
> > > before increasing the default batch size since the larger the batch
> size,
> > > the worse the problem in KIP-794.
> > > 12. As for max.request.size, currently it's used for both the max
> record
> > > size and the max request size, which is unintuitive. Perhaps we could
> > > introduce a new config max.record.size that defaults to 1MB. We could
> > then
> > > increase max.request.size to sth like 10MB.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > > <al...@confluent.io.invalid> wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > I don't mind increasing the max.request.size to a higher number, e.g.
> > 2MB
> > > > could be good.  I think we should also run some benchmarks to see the
> > > > effects of different sizes.
> > > >
> > > > I agree that changing round robin to random solves an independent
> > > existing
> > > > issue, however the logic in this KIP exacerbates the issue, so there
> is
> > > > some dependency.
> > > >
> > > > -Artem
> > > >
> > > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <sh...@gmail.com>
> wrote:
> > > >
> > > > > Hi Artem,
> > > > > Yes, I agree if we go with random selection instead of round-robin
> > > > > selection, the latency issue will be more fair. That is, if there
> are
> > > 10
> > > > > partitions, the 10th partition will always be the last choice in
> each
> > > > round
> > > > > in current design, but with random selection, the chance to be
> > selected
> > > > is
> > > > > more fair.
> > > > >
> > > > > However, I think that's kind of out of scope with this KIP. This is
> > an
> > > > > existing issue, and it might need further discussion to decide if
> > this
> > > > > change is necessary.
> > > > >
> > > > > I agree the default 32KB for "batch.max.size" might be not huge
> > > > improvement
> > > > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > > > "batch.max.size", and make the documentation clear that if the
> > > > > "batch.max.size"
> > > > > is increased, there might be chances that the "ready" partitions
> need
> > > to
> > > > > wait for next request to send to broker, because of the
> > > > "max.request.size"
> > > > > (default 1MB) limitation. "max.request.size" can also be considered
> > to
> > > > > increase to avoid this issue. What do you think?
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > > <al...@confluent.io.invalid> wrote:
> > > > >
> > > > > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > > > > >
> > > > > > I think 32KB is too small.  With 5 in-flight and 100ms latency we
> > can
> > > > > > produce 1.6MB/s per partition.  With 256KB we can produce
> 12.8MB/s
> > > per
> > > > > > partition.  We should probably set up some testing and see if
> 256KB
> > > has
> > > > > > problems.
> > > > > >
> > > > > > To illustrate latency dynamics, let's consider a simplified
> model:
> > 1
> > > > > > in-flight request per broker, produce latency 125ms, 256KB max
> > > request
> > > > > > size, 16 partitions assigned to the same broker, every second
> 128KB
> > > is
> > > > > > produced to each partition (total production rate is 2MB/sec).
> > > > > >
> > > > > > If the batch size is 16KB, then the pattern would be the
> following:
> > > > > >
> > > > > > 0ms - produce 128KB into each partition
> > > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > > 125ms - complete first 16KB from each partition, send next 16KB
> > > > > > 250ms - complete second 16KB, send next 16KB
> > > > > > ...
> > > > > > 1000ms - complete 8th 16KB from each partition
> > > > > >
> > > > > > from this model it's easy to see that there are 256KB that are
> sent
> > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are
> sent
> > in
> > > > > > 875ms.
> > > > > >
> > > > > > If the batch size is 256KB, then the pattern would be the
> > following:
> > > > > >
> > > > > > 0ms - produce 128KB into each partition
> > > > > > 0ms - take 128KB each from first 2 partitions and send (total
> > 256KB)
> > > > > > 125ms - complete 2 first partitions, send data from next 2
> > partitions
> > > > > > ...
> > > > > > 1000ms - complete last 2 partitions
> > > > > >
> > > > > > even though the pattern is different, there are still 256KB that
> > are
> > > > sent
> > > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are
> sent
> > in
> > > > > > 875ms.
> > > > > >
> > > > > > Now, in this example if we do strictly round-robin (current
> > > > > implementation)
> > > > > > and we have this exact pattern (not sure how often such regular
> > > pattern
> > > > > > would happen in practice -- I would expect that it would be a bit
> > > more
> > > > > > random), some partitions would experience higher latency than
> > others
> > > > (not
> > > > > > sure how much it would matter in practice -- in the end of the
> day
> > > some
> > > > > > bytes produced to a topic would have higher latency and some
> bytes
> > > > would
> > > > > > have lower latency).  This pattern is easily fixed by choosing
> the
> > > next
> > > > > > partition randomly instead of using round-robin.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <sh...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Tom,
> > > > > > > Thanks for your comments. And thanks for Artem's explanation.
> > > > > > > Below is my response:
> > > > > > >
> > > > > > > > Currently because buffers are allocated using batch.size it
> > means
> > > > we
> > > > > > can
> > > > > > > handle records that are that large (e.g. one big record per
> > batch).
> > > > > > Doesn't
> > > > > > > the introduction of smaller buffer sizes (batch.initial.size)
> > mean
> > > a
> > > > > > > corresponding decrease in the maximum record size that the
> > producer
> > > > can
> > > > > > > handle?
> > > > > > >
> > > > > > > Actually, the "batch.size" is only like a threshold to decide
> if
> > > the
> > > > > > batch
> > > > > > > is "ready to be sent". That is, even if you set the
> > > "batch.size=16KB"
> > > > > > > (default value), users can still send one record sized with
> 20KB,
> > > as
> > > > > long
> > > > > > > as the size is less than "max.request.size" in producer
> (default
> > > > 1MB).
> > > > > > > Therefore, the introduction of "batch.initial.size" won't
> > decrease
> > > > the
> > > > > > > maximum record size that the producer can handle.
> > > > > > >
> > > > > > > > But isn't there the risk that drainBatchesForOneNode would
> end
> > up
> > > > not
> > > > > > > sending ready
> > > > > > > batches well past when they ought to be sent (according to
> their
> > > > > > linger.ms
> > > > > > > ),
> > > > > > > because it's sending buffers for earlier partitions too
> > > aggressively?
> > > > > > >
> > > > > > > Did you mean that we have a "max.request.size" per request
> > (default
> > > > is
> > > > > > > 1MB), and before this KIP, the request can include 64 batches
> in
> > > > single
> > > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be
> > able
> > > to
> > > > > > > include 32 batches or less, because we aggressively sent more
> > > records
> > > > > in
> > > > > > > one batch, is that what you meant? That's a really good point
> > that
> > > > I've
> > > > > > > never thought about. I think your suggestion to go through
> other
> > > > > > partitions
> > > > > > > that just fit "batch.size", or expire "linger.ms" first,
> before
> > > > > handling
> > > > > > > the one that is > "batch.size" limit is not a good way, because
> > it
> > > > > might
> > > > > > > cause the one with size > "batch.size" always in the lowest
> > > priority,
> > > > > and
> > > > > > > cause starving issue that the batch won't have chance to get
> > sent.
> > > > > > >
> > > > > > > I don't have better solution for it, but maybe I can firstly
> > > decrease
> > > > > the
> > > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB in the
> > KIP.
> > > > > That
> > > > > > > should alleviate the problem. And still improve the throughput.
> > > What
> > > > do
> > > > > > you
> > > > > > > think?
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > > <al...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > > > > I think this KIP would change the behaviour of producers
> when
> > > > there
> > > > > > are
> > > > > > > > multiple partitions ready to be sent
> > > > > > > >
> > > > > > > > This is correct, the pattern changes and becomes more
> > > > coarse-grained.
> > > > > > > But
> > > > > > > > I don't think it changes fairness over the long run.  I think
> > > it's
> > > > a
> > > > > > good
> > > > > > > > idea to change drainIndex to be random rather than round
> robin
> > to
> > > > > avoid
> > > > > > > > forming patterns where some partitions would consistently get
> > > > higher
> > > > > > > > latencies than others because they wait longer for their
> turn.
> > > > > > > >
> > > > > > > > If we really wanted to preserve the exact patterns, we could
> > > either
> > > > > try
> > > > > > > to
> > > > > > > > support multiple 16KB batches from one partition per request
> > > > > (probably
> > > > > > > > would require protocol change to change logic on the broker
> for
> > > > > > duplicate
> > > > > > > > detection) or try to re-batch 16KB batches from accumulator
> > into
> > > > > larger
> > > > > > > > batches during send (additional computations) or try to
> > consider
> > > > all
> > > > > > > > partitions assigned to a broker to check if a new batch needs
> > to
> > > be
> > > > > > > created
> > > > > > > > (i.e. compare cumulative batch size from all partitions
> > assigned
> > > > to a
> > > > > > > > broker and create new batch when cumulative size is 1MB, more
> > > > > complex).
> > > > > > > >
> > > > > > > > Overall, it seems like just increasing the max batch size is
> a
> > > > > simpler
> > > > > > > > solution and it does favor larger batch sizes, which is
> > > beneficial
> > > > > not
> > > > > > > just
> > > > > > > > for production.
> > > > > > > >
> > > > > > > > > ready batches well past when they ought to be sent
> (according
> > > to
> > > > > > their
> > > > > > > > linger.ms)
> > > > > > > >
> > > > > > > > The trigger for marking batches ready to be sent isn't
> changed
> > -
> > > a
> > > > > > batch
> > > > > > > is
> > > > > > > > ready to be sent once it reaches 16KB, so by the time larger
> > > > batches
> > > > > > > start
> > > > > > > > forming, linger.ms wouldn't matter much because the batching
> > > goal
> > > > is
> > > > > > met
> > > > > > > > and the batch can be sent immediately.  Larger batches start
> > > > forming
> > > > > > once
> > > > > > > > the client starts waiting for the server, in which case some
> > data
> > > > > will
> > > > > > > wait
> > > > > > > > its turn to be sent.  This will happen for some data
> regardless
> > > of
> > > > > how
> > > > > > we
> > > > > > > > pick data to send, the question is just whether we'd have
> some
> > > > > > scenarios
> > > > > > > > where some partitions would consistently experience higher
> > > latency
> > > > > than
> > > > > > > > others.  I think picking drainIndex randomly would prevent
> such
> > > > > > > scenarios.
> > > > > > > >
> > > > > > > > -Artem
> > > > > > > >
> > > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> > tbentley@redhat.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Luke,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP!
> > > > > > > > >
> > > > > > > > > Currently because buffers are allocated using batch.size it
> > > means
> > > > > we
> > > > > > > can
> > > > > > > > > handle records that are that large (e.g. one big record per
> > > > batch).
> > > > > > > > Doesn't
> > > > > > > > > the introduction of smaller buffer sizes
> (batch.initial.size)
> > > > mean
> > > > > a
> > > > > > > > > corresponding decrease in the maximum record size that the
> > > > producer
> > > > > > can
> > > > > > > > > handle? That might not be a problem if the user knows their
> > > > maximum
> > > > > > > > record
> > > > > > > > > size and has tuned batch.initial.size accordingly, but if
> the
> > > > > default
> > > > > > > for
> > > > > > > > > batch.initial.size < batch.size it could cause regressions
> > for
> > > > > > existing
> > > > > > > > > users with a large record size, I think. It should be
> enough
> > > for
> > > > > > > > > batch.initial.size to default to batch.size, allowing users
> > who
> > > > > care
> > > > > > > > about
> > > > > > > > > the memory saving in the off-peak throughput case to do the
> > > > tuning,
> > > > > > but
> > > > > > > > not
> > > > > > > > > causing a regression for existing users.
> > > > > > > > >
> > > > > > > > > I think this KIP would change the behaviour of producers
> when
> > > > there
> > > > > > are
> > > > > > > > > multiple partitions ready to be sent: By sending all the
> > ready
> > > > > > buffers
> > > > > > > > > (which may now be > batch.size) for the first partition, we
> > > could
> > > > > end
> > > > > > > up
> > > > > > > > > excluding ready buffers for other partitions from the
> current
> > > > send.
> > > > > > In
> > > > > > > > > other words, as I understand the KIP currently, there's a
> > > change
> > > > in
> > > > > > > > > fairness. I think the code in
> > > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > > will
> > > > > > > > > ensure fairness in the long run, because the drainIndex
> will
> > > > ensure
> > > > > > > that
> > > > > > > > > those other partitions each get their turn at being the
> > first.
> > > > But
> > > > > > > isn't
> > > > > > > > > there the risk that drainBatchesForOneNode would end up not
> > > > sending
> > > > > > > ready
> > > > > > > > > batches well past when they ought to be sent (according to
> > > their
> > > > > > > > linger.ms
> > > > > > > > > ),
> > > > > > > > > because it's sending buffers for earlier partitions too
> > > > > aggressively?
> > > > > > > Or,
> > > > > > > > > to put it another way, perhaps the RecordAccumulator should
> > > > > > round-robin
> > > > > > > > the
> > > > > > > > > ready buffers for _all_ the partitions before trying to
> fill
> > > the
> > > > > > > > remaining
> > > > > > > > > space with the extra buffers (beyond the batch.size limit)
> > for
> > > > the
> > > > > > > first
> > > > > > > > > partitions?
> > > > > > > > >
> > > > > > > > > Kind regards,
> > > > > > > > >
> > > > > > > > > Tom
> > > > > > > > >
> > > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <
> showuon@gmail.com
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > > If no, I'm going to update the KIP based on my previous
> > mail,
> > > > and
> > > > > > > > start a
> > > > > > > > > > vote tomorrow or next week.
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> > showuon@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > >
> > > > > > > > > > > 1. Why do we have to reallocate the buffer? We can
> keep a
> > > > list
> > > > > of
> > > > > > > > > buffers
> > > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > > > > "buffer.initial.size",
> > > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > > ex:
> > > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > > | 4KB |
> > > > > > > > > > >
> > > > > > > > > > > b. when new records reached and the remaining buffer is
> > not
> > > > > > enough
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > > records, we create another batch with
> > "batch.initial.size"
> > > > > buffer
> > > > > > > > > > > ex: we already have 3KB of data in the 1st buffer, and
> > here
> > > > > comes
> > > > > > > the
> > > > > > > > > 2KB
> > > > > > > > > > > record
> > > > > > > > > > >
> > > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create new
> > buffer,
> > > > and
> > > > > > > > linked
> > > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > > >
> > > > > > > > > > > Is that what you mean?
> > > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > > Thank you.
> > > > > > > > > > >
> > > > > > > > > > > 2. I think we should also consider tweaking the
> semantics
> > > of
> > > > > > > > batch.size
> > > > > > > > > > so
> > > > > > > > > > > that the sent batches can be larger if the batch is not
> > > ready
> > > > > to
> > > > > > be
> > > > > > > > > sent
> > > > > > > > > > > (while still respecting max.request.size and perhaps a
> > new
> > > > > > > > > > max.batch.size).
> > > > > > > > > > >
> > > > > > > > > > > --> In the KIP, I was trying to make the "batch.size"
> as
> > > the
> > > > > > upper
> > > > > > > > > bound
> > > > > > > > > > > of the batch size, and introduce a "batch.initial.size"
> > as
> > > > > > initial
> > > > > > > > > batch
> > > > > > > > > > > size.
> > > > > > > > > > > So are you saying that we can let "batch.size" as
> initial
> > > > batch
> > > > > > > size
> > > > > > > > > and
> > > > > > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > > > > > That's a good suggestion, but that would change the
> > > semantics
> > > > > of
> > > > > > > > > > > "batch.size", which might surprise some users. I think
> my
> > > > > > original
> > > > > > > > > > proposal
> > > > > > > > > > > ("batch.initial.size") is safer for users. What do you
> > > think?
> > > > > > > > > > >
> > > > > > > > > > > Thank you.
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > > ismael@juma.me.uk
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> I think we should also consider tweaking the semantics
> > of
> > > > > > > batch.size
> > > > > > > > > so
> > > > > > > > > > >> that the sent batches can be larger if the batch is
> not
> > > > ready
> > > > > to
> > > > > > > be
> > > > > > > > > sent
> > > > > > > > > > >> (while still respecting max.request.size and perhaps a
> > new
> > > > > > > > > > >> max.batch.size).
> > > > > > > > > > >>
> > > > > > > > > > >> Ismael
> > > > > > > > > > >>
> > > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > > ismael@juma.me.uk
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > Hi Luke,
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks for the KIP. Why do we have to reallocate the
> > > > buffer?
> > > > > > We
> > > > > > > > can
> > > > > > > > > > >> keep a
> > > > > > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > > > > > >> >
> > > > > > > > > > >> > Ismael
> > > > > > > > > > >> >
> > > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > > showuon@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > >> >
> > > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > > >> >> I'd like to start the discussion for the proposal:
> > > > KIP-782:
> > > > > > > > > > Expandable
> > > > > > > > > > >> >> batch size in producer.
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> The main purpose for this KIP is to have better
> > memory
> > > > > usage
> > > > > > in
> > > > > > > > > > >> producer,
> > > > > > > > > > >> >> and also save users from the dilemma while setting
> > the
> > > > > batch
> > > > > > > size
> > > > > > > > > > >> >> configuration. After this KIP, users can set a
> higher
> > > > > > > batch.size
> > > > > > > > > > >> without
> > > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > > "batch.initial.size"
> > > > > > > > > and
> > > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > > >> >>
> > > > > > > > > > >> >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> Thank you.
> > > > > > > > > > >> >> Luke
> > > > > > > > > > >> >>
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Artem,

Thanks for the reply.

11. Got it. To me, batch.size is really used for throughput and not for
latency guarantees. There is no guarantee when 16KB will be accumulated.
So, if users want any latency guarantee, they will need to specify
linger.ms accordingly.
Then, batch.size can just be used to tune for throughput.

20. Could we also describe the unit of compression? Is
it batch.initial.size, batch.size or batch.max.size?

Thanks,

Jun

On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
<al...@confluent.io.invalid> wrote:

> Hi Jun,
>
> 10. My understanding is that MemoryRecords would under the covers be
> allocated in chunks, so logically it still would be one MemoryRecords
> object, it's just instead of allocating one large chunk upfront, smaller
> chunks are allocated as needed to grow the batch and linked into a list.
>
> 11. The reason for 2 sizes is to avoid change of behavior when triggering
> batch send with large linger.ms.  Currently, a batch send is triggered
> once
> the batch reaches 16KB by default, if we just raise the default to 256KB,
> then the batch send will be delayed.  Using a separate value would allow
> keeping the current behavior when sending the batch out, but provide better
> throughput with high latency + high bandwidth channels.
>
> -Artem
>
> On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Luke,
> >
> > Thanks for the KIP.  A few comments below.
> >
> > 10. Accumulating small batches could improve memory usage. Will that
> > introduce extra copying when generating a produce request? Currently, a
> > produce request takes a single MemoryRecords per partition.
> > 11. Do we need to introduce a new config batch.max.size? Could we just
> > increase the default of batch.size? We probably need to have KIP-794
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > >
> > resolved
> > before increasing the default batch size since the larger the batch size,
> > the worse the problem in KIP-794.
> > 12. As for max.request.size, currently it's used for both the max record
> > size and the max request size, which is unintuitive. Perhaps we could
> > introduce a new config max.record.size that defaults to 1MB. We could
> then
> > increase max.request.size to sth like 10MB.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> > <al...@confluent.io.invalid> wrote:
> >
> > > Hi Luke,
> > >
> > > I don't mind increasing the max.request.size to a higher number, e.g.
> 2MB
> > > could be good.  I think we should also run some benchmarks to see the
> > > effects of different sizes.
> > >
> > > I agree that changing round robin to random solves an independent
> > existing
> > > issue, however the logic in this KIP exacerbates the issue, so there is
> > > some dependency.
> > >
> > > -Artem
> > >
> > > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <sh...@gmail.com> wrote:
> > >
> > > > Hi Artem,
> > > > Yes, I agree if we go with random selection instead of round-robin
> > > > selection, the latency issue will be more fair. That is, if there are
> > 10
> > > > partitions, the 10th partition will always be the last choice in each
> > > round
> > > > in current design, but with random selection, the chance to be
> selected
> > > is
> > > > more fair.
> > > >
> > > > However, I think that's kind of out of scope with this KIP. This is
> an
> > > > existing issue, and it might need further discussion to decide if
> this
> > > > change is necessary.
> > > >
> > > > I agree the default 32KB for "batch.max.size" might be not huge
> > > improvement
> > > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > > "batch.max.size", and make the documentation clear that if the
> > > > "batch.max.size"
> > > > is increased, there might be chances that the "ready" partitions need
> > to
> > > > wait for next request to send to broker, because of the
> > > "max.request.size"
> > > > (default 1MB) limitation. "max.request.size" can also be considered
> to
> > > > increase to avoid this issue. What do you think?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > > <al...@confluent.io.invalid> wrote:
> > > >
> > > > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > > > >
> > > > > I think 32KB is too small.  With 5 in-flight and 100ms latency we
> can
> > > > > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s
> > per
> > > > > partition.  We should probably set up some testing and see if 256KB
> > has
> > > > > problems.
> > > > >
> > > > > To illustrate latency dynamics, let's consider a simplified model:
> 1
> > > > > in-flight request per broker, produce latency 125ms, 256KB max
> > request
> > > > > size, 16 partitions assigned to the same broker, every second 128KB
> > is
> > > > > produced to each partition (total production rate is 2MB/sec).
> > > > >
> > > > > If the batch size is 16KB, then the pattern would be the following:
> > > > >
> > > > > 0ms - produce 128KB into each partition
> > > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > > 125ms - complete first 16KB from each partition, send next 16KB
> > > > > 250ms - complete second 16KB, send next 16KB
> > > > > ...
> > > > > 1000ms - complete 8th 16KB from each partition
> > > > >
> > > > > from this model it's easy to see that there are 256KB that are sent
> > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent
> in
> > > > > 875ms.
> > > > >
> > > > > If the batch size is 256KB, then the pattern would be the
> following:
> > > > >
> > > > > 0ms - produce 128KB into each partition
> > > > > 0ms - take 128KB each from first 2 partitions and send (total
> 256KB)
> > > > > 125ms - complete 2 first partitions, send data from next 2
> partitions
> > > > > ...
> > > > > 1000ms - complete last 2 partitions
> > > > >
> > > > > even though the pattern is different, there are still 256KB that
> are
> > > sent
> > > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent
> in
> > > > > 875ms.
> > > > >
> > > > > Now, in this example if we do strictly round-robin (current
> > > > implementation)
> > > > > and we have this exact pattern (not sure how often such regular
> > pattern
> > > > > would happen in practice -- I would expect that it would be a bit
> > more
> > > > > random), some partitions would experience higher latency than
> others
> > > (not
> > > > > sure how much it would matter in practice -- in the end of the day
> > some
> > > > > bytes produced to a topic would have higher latency and some bytes
> > > would
> > > > > have lower latency).  This pattern is easily fixed by choosing the
> > next
> > > > > partition randomly instead of using round-robin.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <sh...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Tom,
> > > > > > Thanks for your comments. And thanks for Artem's explanation.
> > > > > > Below is my response:
> > > > > >
> > > > > > > Currently because buffers are allocated using batch.size it
> means
> > > we
> > > > > can
> > > > > > handle records that are that large (e.g. one big record per
> batch).
> > > > > Doesn't
> > > > > > the introduction of smaller buffer sizes (batch.initial.size)
> mean
> > a
> > > > > > corresponding decrease in the maximum record size that the
> producer
> > > can
> > > > > > handle?
> > > > > >
> > > > > > Actually, the "batch.size" is only like a threshold to decide if
> > the
> > > > > batch
> > > > > > is "ready to be sent". That is, even if you set the
> > "batch.size=16KB"
> > > > > > (default value), users can still send one record sized with 20KB,
> > as
> > > > long
> > > > > > as the size is less than "max.request.size" in producer (default
> > > 1MB).
> > > > > > Therefore, the introduction of "batch.initial.size" won't
> decrease
> > > the
> > > > > > maximum record size that the producer can handle.
> > > > > >
> > > > > > > But isn't there the risk that drainBatchesForOneNode would end
> up
> > > not
> > > > > > sending ready
> > > > > > batches well past when they ought to be sent (according to their
> > > > > linger.ms
> > > > > > ),
> > > > > > because it's sending buffers for earlier partitions too
> > aggressively?
> > > > > >
> > > > > > Did you mean that we have a "max.request.size" per request
> (default
> > > is
> > > > > > 1MB), and before this KIP, the request can include 64 batches in
> > > single
> > > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be
> able
> > to
> > > > > > include 32 batches or less, because we aggressively sent more
> > records
> > > > in
> > > > > > one batch, is that what you meant? That's a really good point
> that
> > > I've
> > > > > > never thought about. I think your suggestion to go through other
> > > > > partitions
> > > > > > that just fit "batch.size", or expire "linger.ms" first, before
> > > > handling
> > > > > > the one that is > "batch.size" limit is not a good way, because
> it
> > > > might
> > > > > > cause the one with size > "batch.size" always in the lowest
> > priority,
> > > > and
> > > > > > cause starving issue that the batch won't have chance to get
> sent.
> > > > > >
> > > > > > I don't have better solution for it, but maybe I can firstly
> > decrease
> > > > the
> > > > > > "batch.max.size" to 32KB, instead of aggressively 256KB in the
> KIP.
> > > > That
> > > > > > should alleviate the problem. And still improve the throughput.
> > What
> > > do
> > > > > you
> > > > > > think?
> > > > > >
> > > > > > Thank you.
> > > > > > Luke
> > > > > >
> > > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > > <al...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > > I think this KIP would change the behaviour of producers when
> > > there
> > > > > are
> > > > > > > multiple partitions ready to be sent
> > > > > > >
> > > > > > > This is correct, the pattern changes and becomes more
> > > coarse-grained.
> > > > > > But
> > > > > > > I don't think it changes fairness over the long run.  I think
> > it's
> > > a
> > > > > good
> > > > > > > idea to change drainIndex to be random rather than round robin
> to
> > > > avoid
> > > > > > > forming patterns where some partitions would consistently get
> > > higher
> > > > > > > latencies than others because they wait longer for their turn.
> > > > > > >
> > > > > > > If we really wanted to preserve the exact patterns, we could
> > either
> > > > try
> > > > > > to
> > > > > > > support multiple 16KB batches from one partition per request
> > > > (probably
> > > > > > > would require protocol change to change logic on the broker for
> > > > > duplicate
> > > > > > > detection) or try to re-batch 16KB batches from accumulator
> into
> > > > larger
> > > > > > > batches during send (additional computations) or try to
> consider
> > > all
> > > > > > > partitions assigned to a broker to check if a new batch needs
> to
> > be
> > > > > > created
> > > > > > > (i.e. compare cumulative batch size from all partitions
> assigned
> > > to a
> > > > > > > broker and create new batch when cumulative size is 1MB, more
> > > > complex).
> > > > > > >
> > > > > > > Overall, it seems like just increasing the max batch size is a
> > > > simpler
> > > > > > > solution and it does favor larger batch sizes, which is
> > beneficial
> > > > not
> > > > > > just
> > > > > > > for production.
> > > > > > >
> > > > > > > > ready batches well past when they ought to be sent (according
> > to
> > > > > their
> > > > > > > linger.ms)
> > > > > > >
> > > > > > > The trigger for marking batches ready to be sent isn't changed
> -
> > a
> > > > > batch
> > > > > > is
> > > > > > > ready to be sent once it reaches 16KB, so by the time larger
> > > batches
> > > > > > start
> > > > > > > forming, linger.ms wouldn't matter much because the batching
> > goal
> > > is
> > > > > met
> > > > > > > and the batch can be sent immediately.  Larger batches start
> > > forming
> > > > > once
> > > > > > > the client starts waiting for the server, in which case some
> data
> > > > will
> > > > > > wait
> > > > > > > its turn to be sent.  This will happen for some data regardless
> > of
> > > > how
> > > > > we
> > > > > > > pick data to send, the question is just whether we'd have some
> > > > > scenarios
> > > > > > > where some partitions would consistently experience higher
> > latency
> > > > than
> > > > > > > others.  I think picking drainIndex randomly would prevent such
> > > > > > scenarios.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <
> tbentley@redhat.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Luke,
> > > > > > > >
> > > > > > > > Thanks for the KIP!
> > > > > > > >
> > > > > > > > Currently because buffers are allocated using batch.size it
> > means
> > > > we
> > > > > > can
> > > > > > > > handle records that are that large (e.g. one big record per
> > > batch).
> > > > > > > Doesn't
> > > > > > > > the introduction of smaller buffer sizes (batch.initial.size)
> > > mean
> > > > a
> > > > > > > > corresponding decrease in the maximum record size that the
> > > producer
> > > > > can
> > > > > > > > handle? That might not be a problem if the user knows their
> > > maximum
> > > > > > > record
> > > > > > > > size and has tuned batch.initial.size accordingly, but if the
> > > > default
> > > > > > for
> > > > > > > > batch.initial.size < batch.size it could cause regressions
> for
> > > > > existing
> > > > > > > > users with a large record size, I think. It should be enough
> > for
> > > > > > > > batch.initial.size to default to batch.size, allowing users
> who
> > > > care
> > > > > > > about
> > > > > > > > the memory saving in the off-peak throughput case to do the
> > > tuning,
> > > > > but
> > > > > > > not
> > > > > > > > causing a regression for existing users.
> > > > > > > >
> > > > > > > > I think this KIP would change the behaviour of producers when
> > > there
> > > > > are
> > > > > > > > multiple partitions ready to be sent: By sending all the
> ready
> > > > > buffers
> > > > > > > > (which may now be > batch.size) for the first partition, we
> > could
> > > > end
> > > > > > up
> > > > > > > > excluding ready buffers for other partitions from the current
> > > send.
> > > > > In
> > > > > > > > other words, as I understand the KIP currently, there's a
> > change
> > > in
> > > > > > > > fairness. I think the code in
> > > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > > will
> > > > > > > > ensure fairness in the long run, because the drainIndex will
> > > ensure
> > > > > > that
> > > > > > > > those other partitions each get their turn at being the
> first.
> > > But
> > > > > > isn't
> > > > > > > > there the risk that drainBatchesForOneNode would end up not
> > > sending
> > > > > > ready
> > > > > > > > batches well past when they ought to be sent (according to
> > their
> > > > > > > linger.ms
> > > > > > > > ),
> > > > > > > > because it's sending buffers for earlier partitions too
> > > > aggressively?
> > > > > > Or,
> > > > > > > > to put it another way, perhaps the RecordAccumulator should
> > > > > round-robin
> > > > > > > the
> > > > > > > > ready buffers for _all_ the partitions before trying to fill
> > the
> > > > > > > remaining
> > > > > > > > space with the extra buffers (beyond the batch.size limit)
> for
> > > the
> > > > > > first
> > > > > > > > partitions?
> > > > > > > >
> > > > > > > > Kind regards,
> > > > > > > >
> > > > > > > > Tom
> > > > > > > >
> > > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <showuon@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Ismael and all devs,
> > > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > > If no, I'm going to update the KIP based on my previous
> mail,
> > > and
> > > > > > > start a
> > > > > > > > > vote tomorrow or next week.
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <
> showuon@gmail.com
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Ismael,
> > > > > > > > > > Thanks for your comments.
> > > > > > > > > >
> > > > > > > > > > 1. Why do we have to reallocate the buffer? We can keep a
> > > list
> > > > of
> > > > > > > > buffers
> > > > > > > > > > instead and avoid reallocation.
> > > > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > > > "buffer.initial.size",
> > > > > > > > > > and link them together (with linked list)?
> > > > > > > > > > ex:
> > > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > > | 4KB |
> > > > > > > > > >
> > > > > > > > > > b. when new records reached and the remaining buffer is
> not
> > > > > enough
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > records, we create another batch with
> "batch.initial.size"
> > > > buffer
> > > > > > > > > > ex: we already have 3KB of data in the 1st buffer, and
> here
> > > > comes
> > > > > > the
> > > > > > > > 2KB
> > > > > > > > > > record
> > > > > > > > > >
> > > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > > now, record: 2KB coming
> > > > > > > > > > We fill the 1st 1KB into 1st buffer, and create new
> buffer,
> > > and
> > > > > > > linked
> > > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > > >
> > > > > > > > > > Is that what you mean?
> > > > > > > > > > If so, I think I like this idea!
> > > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > > Thank you.
> > > > > > > > > >
> > > > > > > > > > 2. I think we should also consider tweaking the semantics
> > of
> > > > > > > batch.size
> > > > > > > > > so
> > > > > > > > > > that the sent batches can be larger if the batch is not
> > ready
> > > > to
> > > > > be
> > > > > > > > sent
> > > > > > > > > > (while still respecting max.request.size and perhaps a
> new
> > > > > > > > > max.batch.size).
> > > > > > > > > >
> > > > > > > > > > --> In the KIP, I was trying to make the "batch.size" as
> > the
> > > > > upper
> > > > > > > > bound
> > > > > > > > > > of the batch size, and introduce a "batch.initial.size"
> as
> > > > > initial
> > > > > > > > batch
> > > > > > > > > > size.
> > > > > > > > > > So are you saying that we can let "batch.size" as initial
> > > batch
> > > > > > size
> > > > > > > > and
> > > > > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > > > > That's a good suggestion, but that would change the
> > semantics
> > > > of
> > > > > > > > > > "batch.size", which might surprise some users. I think my
> > > > > original
> > > > > > > > > proposal
> > > > > > > > > > ("batch.initial.size") is safer for users. What do you
> > think?
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > > ismael@juma.me.uk
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> I think we should also consider tweaking the semantics
> of
> > > > > > batch.size
> > > > > > > > so
> > > > > > > > > >> that the sent batches can be larger if the batch is not
> > > ready
> > > > to
> > > > > > be
> > > > > > > > sent
> > > > > > > > > >> (while still respecting max.request.size and perhaps a
> new
> > > > > > > > > >> max.batch.size).
> > > > > > > > > >>
> > > > > > > > > >> Ismael
> > > > > > > > > >>
> > > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > > ismael@juma.me.uk
> > > > >
> > > > > > > wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hi Luke,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for the KIP. Why do we have to reallocate the
> > > buffer?
> > > > > We
> > > > > > > can
> > > > > > > > > >> keep a
> > > > > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > > > > >> >
> > > > > > > > > >> > Ismael
> > > > > > > > > >> >
> > > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > > showuon@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > > >> >> I'd like to start the discussion for the proposal:
> > > KIP-782:
> > > > > > > > > Expandable
> > > > > > > > > >> >> batch size in producer.
> > > > > > > > > >> >>
> > > > > > > > > >> >> The main purpose for this KIP is to have better
> memory
> > > > usage
> > > > > in
> > > > > > > > > >> producer,
> > > > > > > > > >> >> and also save users from the dilemma while setting
> the
> > > > batch
> > > > > > size
> > > > > > > > > >> >> configuration. After this KIP, users can set a higher
> > > > > > batch.size
> > > > > > > > > >> without
> > > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > > "batch.initial.size"
> > > > > > > > and
> > > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > > >> >>
> > > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > > >> >>
> > > > > > > > > >> >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > > >> >>
> > > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > > >> >>
> > > > > > > > > >> >> Thank you.
> > > > > > > > > >> >> Luke
> > > > > > > > > >> >>
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-782: Expandable batch size in producer

Posted by Artem Livshits <al...@confluent.io.INVALID>.
Hi Jun,

10. My understanding is that MemoryRecords would under the covers be
allocated in chunks, so logically it still would be one MemoryRecords
object, it's just instead of allocating one large chunk upfront, smaller
chunks are allocated as needed to grow the batch and linked into a list.

11. The reason for 2 sizes is to avoid change of behavior when triggering
batch send with large linger.ms.  Currently, a batch send is triggered once
the batch reaches 16KB by default, if we just raise the default to 256KB,
then the batch send will be delayed.  Using a separate value would allow
keeping the current behavior when sending the batch out, but provide better
throughput with high latency + high bandwidth channels.

-Artem

On Tue, Dec 7, 2021 at 5:29 PM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Luke,
>
> Thanks for the KIP.  A few comments below.
>
> 10. Accumulating small batches could improve memory usage. Will that
> introduce extra copying when generating a produce request? Currently, a
> produce request takes a single MemoryRecords per partition.
> 11. Do we need to introduce a new config batch.max.size? Could we just
> increase the default of batch.size? We probably need to have KIP-794
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> >
> resolved
> before increasing the default batch size since the larger the batch size,
> the worse the problem in KIP-794.
> 12. As for max.request.size, currently it's used for both the max record
> size and the max request size, which is unintuitive. Perhaps we could
> introduce a new config max.record.size that defaults to 1MB. We could then
> increase max.request.size to sth like 10MB.
>
> Thanks,
>
> Jun
>
>
> On Mon, Nov 29, 2021 at 6:02 PM Artem Livshits
> <al...@confluent.io.invalid> wrote:
>
> > Hi Luke,
> >
> > I don't mind increasing the max.request.size to a higher number, e.g. 2MB
> > could be good.  I think we should also run some benchmarks to see the
> > effects of different sizes.
> >
> > I agree that changing round robin to random solves an independent
> existing
> > issue, however the logic in this KIP exacerbates the issue, so there is
> > some dependency.
> >
> > -Artem
> >
> > On Wed, Nov 24, 2021 at 12:43 AM Luke Chen <sh...@gmail.com> wrote:
> >
> > > Hi Artem,
> > > Yes, I agree if we go with random selection instead of round-robin
> > > selection, the latency issue will be more fair. That is, if there are
> 10
> > > partitions, the 10th partition will always be the last choice in each
> > round
> > > in current design, but with random selection, the chance to be selected
> > is
> > > more fair.
> > >
> > > However, I think that's kind of out of scope with this KIP. This is an
> > > existing issue, and it might need further discussion to decide if this
> > > change is necessary.
> > >
> > > I agree the default 32KB for "batch.max.size" might be not huge
> > improvement
> > > compared with 256KB. I'm thinking, maybe default to "64KB" for
> > > "batch.max.size", and make the documentation clear that if the
> > > "batch.max.size"
> > > is increased, there might be chances that the "ready" partitions need
> to
> > > wait for next request to send to broker, because of the
> > "max.request.size"
> > > (default 1MB) limitation. "max.request.size" can also be considered to
> > > increase to avoid this issue. What do you think?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
> > > <al...@confluent.io.invalid> wrote:
> > >
> > > > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> > > >
> > > > I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> > > > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s
> per
> > > > partition.  We should probably set up some testing and see if 256KB
> has
> > > > problems.
> > > >
> > > > To illustrate latency dynamics, let's consider a simplified model: 1
> > > > in-flight request per broker, produce latency 125ms, 256KB max
> request
> > > > size, 16 partitions assigned to the same broker, every second 128KB
> is
> > > > produced to each partition (total production rate is 2MB/sec).
> > > >
> > > > If the batch size is 16KB, then the pattern would be the following:
> > > >
> > > > 0ms - produce 128KB into each partition
> > > > 0ms - take 16KB from each partition send (total 256KB)
> > > > 125ms - complete first 16KB from each partition, send next 16KB
> > > > 250ms - complete second 16KB, send next 16KB
> > > > ...
> > > > 1000ms - complete 8th 16KB from each partition
> > > >
> > > > from this model it's easy to see that there are 256KB that are sent
> > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > > > 875ms.
> > > >
> > > > If the batch size is 256KB, then the pattern would be the following:
> > > >
> > > > 0ms - produce 128KB into each partition
> > > > 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> > > > 125ms - complete 2 first partitions, send data from next 2 partitions
> > > > ...
> > > > 1000ms - complete last 2 partitions
> > > >
> > > > even though the pattern is different, there are still 256KB that are
> > sent
> > > > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > > > 875ms.
> > > >
> > > > Now, in this example if we do strictly round-robin (current
> > > implementation)
> > > > and we have this exact pattern (not sure how often such regular
> pattern
> > > > would happen in practice -- I would expect that it would be a bit
> more
> > > > random), some partitions would experience higher latency than others
> > (not
> > > > sure how much it would matter in practice -- in the end of the day
> some
> > > > bytes produced to a topic would have higher latency and some bytes
> > would
> > > > have lower latency).  This pattern is easily fixed by choosing the
> next
> > > > partition randomly instead of using round-robin.
> > > >
> > > > -Artem
> > > >
> > > > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <sh...@gmail.com>
> wrote:
> > > >
> > > > > Hi Tom,
> > > > > Thanks for your comments. And thanks for Artem's explanation.
> > > > > Below is my response:
> > > > >
> > > > > > Currently because buffers are allocated using batch.size it means
> > we
> > > > can
> > > > > handle records that are that large (e.g. one big record per batch).
> > > > Doesn't
> > > > > the introduction of smaller buffer sizes (batch.initial.size) mean
> a
> > > > > corresponding decrease in the maximum record size that the producer
> > can
> > > > > handle?
> > > > >
> > > > > Actually, the "batch.size" is only like a threshold to decide if
> the
> > > > batch
> > > > > is "ready to be sent". That is, even if you set the
> "batch.size=16KB"
> > > > > (default value), users can still send one record sized with 20KB,
> as
> > > long
> > > > > as the size is less than "max.request.size" in producer (default
> > 1MB).
> > > > > Therefore, the introduction of "batch.initial.size" won't decrease
> > the
> > > > > maximum record size that the producer can handle.
> > > > >
> > > > > > But isn't there the risk that drainBatchesForOneNode would end up
> > not
> > > > > sending ready
> > > > > batches well past when they ought to be sent (according to their
> > > > linger.ms
> > > > > ),
> > > > > because it's sending buffers for earlier partitions too
> aggressively?
> > > > >
> > > > > Did you mean that we have a "max.request.size" per request (default
> > is
> > > > > 1MB), and before this KIP, the request can include 64 batches in
> > single
> > > > > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able
> to
> > > > > include 32 batches or less, because we aggressively sent more
> records
> > > in
> > > > > one batch, is that what you meant? That's a really good point that
> > I've
> > > > > never thought about. I think your suggestion to go through other
> > > > partitions
> > > > > that just fit "batch.size", or expire "linger.ms" first, before
> > > handling
> > > > > the one that is > "batch.size" limit is not a good way, because it
> > > might
> > > > > cause the one with size > "batch.size" always in the lowest
> priority,
> > > and
> > > > > cause starving issue that the batch won't have chance to get sent.
> > > > >
> > > > > I don't have better solution for it, but maybe I can firstly
> decrease
> > > the
> > > > > "batch.max.size" to 32KB, instead of aggressively 256KB in the KIP.
> > > That
> > > > > should alleviate the problem. And still improve the throughput.
> What
> > do
> > > > you
> > > > > think?
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > > > > <al...@confluent.io.invalid> wrote:
> > > > >
> > > > > > > I think this KIP would change the behaviour of producers when
> > there
> > > > are
> > > > > > multiple partitions ready to be sent
> > > > > >
> > > > > > This is correct, the pattern changes and becomes more
> > coarse-grained.
> > > > > But
> > > > > > I don't think it changes fairness over the long run.  I think
> it's
> > a
> > > > good
> > > > > > idea to change drainIndex to be random rather than round robin to
> > > avoid
> > > > > > forming patterns where some partitions would consistently get
> > higher
> > > > > > latencies than others because they wait longer for their turn.
> > > > > >
> > > > > > If we really wanted to preserve the exact patterns, we could
> either
> > > try
> > > > > to
> > > > > > support multiple 16KB batches from one partition per request
> > > (probably
> > > > > > would require protocol change to change logic on the broker for
> > > > duplicate
> > > > > > detection) or try to re-batch 16KB batches from accumulator into
> > > larger
> > > > > > batches during send (additional computations) or try to consider
> > all
> > > > > > partitions assigned to a broker to check if a new batch needs to
> be
> > > > > created
> > > > > > (i.e. compare cumulative batch size from all partitions assigned
> > to a
> > > > > > broker and create new batch when cumulative size is 1MB, more
> > > complex).
> > > > > >
> > > > > > Overall, it seems like just increasing the max batch size is a
> > > simpler
> > > > > > solution and it does favor larger batch sizes, which is
> beneficial
> > > not
> > > > > just
> > > > > > for production.
> > > > > >
> > > > > > > ready batches well past when they ought to be sent (according
> to
> > > > their
> > > > > > linger.ms)
> > > > > >
> > > > > > The trigger for marking batches ready to be sent isn't changed -
> a
> > > > batch
> > > > > is
> > > > > > ready to be sent once it reaches 16KB, so by the time larger
> > batches
> > > > > start
> > > > > > forming, linger.ms wouldn't matter much because the batching
> goal
> > is
> > > > met
> > > > > > and the batch can be sent immediately.  Larger batches start
> > forming
> > > > once
> > > > > > the client starts waiting for the server, in which case some data
> > > will
> > > > > wait
> > > > > > its turn to be sent.  This will happen for some data regardless
> of
> > > how
> > > > we
> > > > > > pick data to send, the question is just whether we'd have some
> > > > scenarios
> > > > > > where some partitions would consistently experience higher
> latency
> > > than
> > > > > > others.  I think picking drainIndex randomly would prevent such
> > > > > scenarios.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tbentley@redhat.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Luke,
> > > > > > >
> > > > > > > Thanks for the KIP!
> > > > > > >
> > > > > > > Currently because buffers are allocated using batch.size it
> means
> > > we
> > > > > can
> > > > > > > handle records that are that large (e.g. one big record per
> > batch).
> > > > > > Doesn't
> > > > > > > the introduction of smaller buffer sizes (batch.initial.size)
> > mean
> > > a
> > > > > > > corresponding decrease in the maximum record size that the
> > producer
> > > > can
> > > > > > > handle? That might not be a problem if the user knows their
> > maximum
> > > > > > record
> > > > > > > size and has tuned batch.initial.size accordingly, but if the
> > > default
> > > > > for
> > > > > > > batch.initial.size < batch.size it could cause regressions for
> > > > existing
> > > > > > > users with a large record size, I think. It should be enough
> for
> > > > > > > batch.initial.size to default to batch.size, allowing users who
> > > care
> > > > > > about
> > > > > > > the memory saving in the off-peak throughput case to do the
> > tuning,
> > > > but
> > > > > > not
> > > > > > > causing a regression for existing users.
> > > > > > >
> > > > > > > I think this KIP would change the behaviour of producers when
> > there
> > > > are
> > > > > > > multiple partitions ready to be sent: By sending all the ready
> > > > buffers
> > > > > > > (which may now be > batch.size) for the first partition, we
> could
> > > end
> > > > > up
> > > > > > > excluding ready buffers for other partitions from the current
> > send.
> > > > In
> > > > > > > other words, as I understand the KIP currently, there's a
> change
> > in
> > > > > > > fairness. I think the code in
> > > > RecordAccumulator#drainBatchesForOneNode
> > > > > > will
> > > > > > > ensure fairness in the long run, because the drainIndex will
> > ensure
> > > > > that
> > > > > > > those other partitions each get their turn at being the first.
> > But
> > > > > isn't
> > > > > > > there the risk that drainBatchesForOneNode would end up not
> > sending
> > > > > ready
> > > > > > > batches well past when they ought to be sent (according to
> their
> > > > > > linger.ms
> > > > > > > ),
> > > > > > > because it's sending buffers for earlier partitions too
> > > aggressively?
> > > > > Or,
> > > > > > > to put it another way, perhaps the RecordAccumulator should
> > > > round-robin
> > > > > > the
> > > > > > > ready buffers for _all_ the partitions before trying to fill
> the
> > > > > > remaining
> > > > > > > space with the extra buffers (beyond the batch.size limit) for
> > the
> > > > > first
> > > > > > > partitions?
> > > > > > >
> > > > > > > Kind regards,
> > > > > > >
> > > > > > > Tom
> > > > > > >
> > > > > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <sh...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Hi Ismael and all devs,
> > > > > > > > Is there any comments/suggestions to this KIP?
> > > > > > > > If no, I'm going to update the KIP based on my previous mail,
> > and
> > > > > > start a
> > > > > > > > vote tomorrow or next week.
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <showuon@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Ismael,
> > > > > > > > > Thanks for your comments.
> > > > > > > > >
> > > > > > > > > 1. Why do we have to reallocate the buffer? We can keep a
> > list
> > > of
> > > > > > > buffers
> > > > > > > > > instead and avoid reallocation.
> > > > > > > > > -> Do you mean we allocate multiple buffers with
> > > > > > "buffer.initial.size",
> > > > > > > > > and link them together (with linked list)?
> > > > > > > > > ex:
> > > > > > > > > a. We allocate 4KB initial buffer
> > > > > > > > > | 4KB |
> > > > > > > > >
> > > > > > > > > b. when new records reached and the remaining buffer is not
> > > > enough
> > > > > > for
> > > > > > > > the
> > > > > > > > > records, we create another batch with "batch.initial.size"
> > > buffer
> > > > > > > > > ex: we already have 3KB of data in the 1st buffer, and here
> > > comes
> > > > > the
> > > > > > > 2KB
> > > > > > > > > record
> > > > > > > > >
> > > > > > > > > | 4KB (1KB remaining) |
> > > > > > > > > now, record: 2KB coming
> > > > > > > > > We fill the 1st 1KB into 1st buffer, and create new buffer,
> > and
> > > > > > linked
> > > > > > > > > together, and fill the rest of data into it
> > > > > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > > > > >
> > > > > > > > > Is that what you mean?
> > > > > > > > > If so, I think I like this idea!
> > > > > > > > > If not, please explain more detail about it.
> > > > > > > > > Thank you.
> > > > > > > > >
> > > > > > > > > 2. I think we should also consider tweaking the semantics
> of
> > > > > > batch.size
> > > > > > > > so
> > > > > > > > > that the sent batches can be larger if the batch is not
> ready
> > > to
> > > > be
> > > > > > > sent
> > > > > > > > > (while still respecting max.request.size and perhaps a new
> > > > > > > > max.batch.size).
> > > > > > > > >
> > > > > > > > > --> In the KIP, I was trying to make the "batch.size" as
> the
> > > > upper
> > > > > > > bound
> > > > > > > > > of the batch size, and introduce a "batch.initial.size" as
> > > > initial
> > > > > > > batch
> > > > > > > > > size.
> > > > > > > > > So are you saying that we can let "batch.size" as initial
> > batch
> > > > > size
> > > > > > > and
> > > > > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > > > > That's a good suggestion, but that would change the
> semantics
> > > of
> > > > > > > > > "batch.size", which might surprise some users. I think my
> > > > original
> > > > > > > > proposal
> > > > > > > > > ("batch.initial.size") is safer for users. What do you
> think?
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <
> > ismael@juma.me.uk
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> I think we should also consider tweaking the semantics of
> > > > > batch.size
> > > > > > > so
> > > > > > > > >> that the sent batches can be larger if the batch is not
> > ready
> > > to
> > > > > be
> > > > > > > sent
> > > > > > > > >> (while still respecting max.request.size and perhaps a new
> > > > > > > > >> max.batch.size).
> > > > > > > > >>
> > > > > > > > >> Ismael
> > > > > > > > >>
> > > > > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <
> > ismael@juma.me.uk
> > > >
> > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > Hi Luke,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for the KIP. Why do we have to reallocate the
> > buffer?
> > > > We
> > > > > > can
> > > > > > > > >> keep a
> > > > > > > > >> > list of buffers instead and avoid reallocation.
> > > > > > > > >> >
> > > > > > > > >> > Ismael
> > > > > > > > >> >
> > > > > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <
> > showuon@gmail.com>
> > > > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> >> Hi Kafka dev,
> > > > > > > > >> >> I'd like to start the discussion for the proposal:
> > KIP-782:
> > > > > > > > Expandable
> > > > > > > > >> >> batch size in producer.
> > > > > > > > >> >>
> > > > > > > > >> >> The main purpose for this KIP is to have better memory
> > > usage
> > > > in
> > > > > > > > >> producer,
> > > > > > > > >> >> and also save users from the dilemma while setting the
> > > batch
> > > > > size
> > > > > > > > >> >> configuration. After this KIP, users can set a higher
> > > > > batch.size
> > > > > > > > >> without
> > > > > > > > >> >> worries, and of course, with an appropriate
> > > > > "batch.initial.size"
> > > > > > > and
> > > > > > > > >> >> "batch.reallocation.factor".
> > > > > > > > >> >>
> > > > > > > > >> >> Derailed description can be found here:
> > > > > > > > >> >>
> > > > > > > > >> >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > > > > >> >>
> > > > > > > > >> >> Any comments and feedback are welcome.
> > > > > > > > >> >>
> > > > > > > > >> >> Thank you.
> > > > > > > > >> >> Luke
> > > > > > > > >> >>
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>