You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jiamei xie (Jira)" <ji...@apache.org> on 2020/03/11 09:21:00 UTC

[jira] [Created] (KAFKA-9703) ProducerBatch.split takes up too many resources if the bigBatch is huge

jiamei xie created KAFKA-9703:
---------------------------------

             Summary: ProducerBatch.split takes up too many resources if the bigBatch is huge
                 Key: KAFKA-9703
                 URL: https://issues.apache.org/jira/browse/KAFKA-9703
             Project: Kafka
          Issue Type: Bug
            Reporter: jiamei xie


ProducerBatch.split takes up too many resources  and might cause outOfMemory error if the bigBatch is huge. About how I found this issue is in https://lists.apache.org/list.html?users@kafka.apache.org:lte=1M:MESSAGE_TOO_LARGE

Following is the code which takes a lot of resources.

{code:java}
 for (Record record : recordBatch) {
            assert thunkIter.hasNext();
            Thunk thunk = thunkIter.next();
            if (batch == null)
                batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);

            // A newly created batch can always host the first message.
            if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
                batches.add(batch);
                batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
                batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
            }
{code}

Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() after a batch is full.

{code:java}
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque, long nowMs) {
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
        }
        return null;
    }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)