You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jiangjie Qin (Jira)" <ji...@apache.org> on 2020/04/16 00:12:00 UTC

[jira] [Assigned] (KAFKA-9703) ProducerBatch.split takes up too many resources if the bigBatch is huge

     [ https://issues.apache.org/jira/browse/KAFKA-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jiangjie Qin reassigned KAFKA-9703:
-----------------------------------

    Fix Version/s: 2.6.0
         Assignee: jiamei xie
       Resolution: Fixed

Just to make it clear, the root cause of the OOM in the ticket description was due to #KAFKA-9700. This ticket added an optimization to release the compression buffer when splitting a large {{ProducerBatch}}. Because this is just an optimization, we are not cherry-picking it to earlier release branches.

> ProducerBatch.split takes up too many resources if the bigBatch is huge
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-9703
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9703
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: jiamei xie
>            Assignee: jiamei xie
>            Priority: Major
>             Fix For: 2.6.0
>
>
> ProducerBatch.split takes up too many resources  and might cause outOfMemory error if the bigBatch is huge. About how I found this issue is in https://lists.apache.org/list.html?users@kafka.apache.org:lte=1M:MESSAGE_TOO_LARGE
> Following is the code which takes a lot of resources.
> {code:java}
>  for (Record record : recordBatch) {
>             assert thunkIter.hasNext();
>             Thunk thunk = thunkIter.next();
>             if (batch == null)
>                 batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
>             // A newly created batch can always host the first message.
>             if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
>                 batches.add(batch);
>                 batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
>                 batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
>             }
> {code}
> Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() after a batch is full.
> {code:java}
>     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
>                                          Callback callback, Deque<ProducerBatch> deque, long nowMs) {
>         ProducerBatch last = deque.peekLast();
>         if (last != null) {
>             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
>             if (future == null)
>                 last.closeForRecordAppends();
>             else
>                 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
>         }
>         return null;
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)