You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/09/15 00:39:29 UTC

[kafka] branch trunk updated: KAFKA-14156: Built-in partitioner may create suboptimal batches (#12570)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b2039f0ba KAFKA-14156: Built-in partitioner may create suboptimal batches (#12570)
2b2039f0ba is described below

commit 2b2039f0ba88e210dd09031291c050cfcda7ce4a
Author: Artem Livshits <84...@users.noreply.github.com>
AuthorDate: Wed Sep 14 17:39:14 2022 -0700

    KAFKA-14156: Built-in partitioner may create suboptimal batches (#12570)
    
    Now the built-in partitioner defers partition switch (while still
    accounting produced bytes) if there is no ready batch to send, thus
    avoiding switching partitions and creating fractional batches.
    
    Reviewers: Jun Rao <ju...@confluent.io>
---
 .../producer/internals/BuiltInPartitioner.java     | 41 +++++++++++++-
 .../producer/internals/RecordAccumulator.java      | 66 ++++++++++++++++++----
 .../producer/internals/RecordAccumulatorTest.java  | 54 ++++++++++++++----
 3 files changed, 140 insertions(+), 21 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
index a5805df56b..1f46dbb2fc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
@@ -170,13 +170,52 @@ public class BuiltInPartitioner {
      * @param cluster The cluster information
      */
     void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) {
+        updatePartitionInfo(partitionInfo, appendedBytes, cluster, true);
+    }
+
+    /**
+     * Update partition info with the number of bytes appended and maybe switch partition.
+     * NOTE this function needs to be called under the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+     * @param appendedBytes The number of bytes appended to this partition
+     * @param cluster The cluster information
+     * @param enableSwitch If true, switch partition once produced enough bytes
+     */
+    void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster, boolean enableSwitch) {
         // partitionInfo may be null if the caller didn't use built-in partitioner.
         if (partitionInfo == null)
             return;
 
         assert partitionInfo == stickyPartitionInfo.get();
         int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes);
-        if (producedBytes >= stickyBatchSize) {
+
+        // We're trying to switch partition once we produce stickyBatchSize bytes to a partition
+        // but doing so may hinder batching because partition switch may happen while batch isn't
+        // ready to send.  This situation is especially likely with high linger.ms setting.
+        // Consider the following example:
+        //   linger.ms=500, producer produces 12KB in 500ms, batch.size=16KB
+        //     - first batch collects 12KB in 500ms, gets sent
+        //     - second batch collects 4KB, then we switch partition, so 4KB gets eventually sent
+        //     - ... and so on - we'd get 12KB and 4KB batches
+        // To get more optimal batching and avoid 4KB fractional batches, the caller may disallow
+        // partition switch if batch is not ready to send, so with the example above we'd avoid
+        // fractional 4KB batches: in that case the scenario would look like this:
+        //     - first batch collects 12KB in 500ms, gets sent
+        //     - second batch collects 4KB, but partition switch doesn't happen because batch in not ready
+        //     - second batch collects 12KB in 500ms, gets sent and now we switch partition.
+        //     - ... and so on - we'd just send 12KB batches
+        // We cap the produced bytes to not exceed 2x of the batch size to avoid pathological cases
+        // (e.g. if we have a mix of keyed and unkeyed messages, key messages may create an
+        // unready batch after the batch that disabled partition switch becomes ready).
+        // As a result, with high latency.ms setting we end up switching partitions after producing
+        // between stickyBatchSize and stickyBatchSize * 2 bytes, to better align with batch boundary.
+        if (producedBytes >= stickyBatchSize * 2) {
+            log.trace("Produced {} bytes, exceeding twice the batch size of {} bytes, with switching set to {}",
+                producedBytes, stickyBatchSize, enableSwitch);
+        }
+
+        if (producedBytes >= stickyBatchSize && enableSwitch || producedBytes >= stickyBatchSize * 2) {
             // We've produced enough to this partition, switch to next.
             StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(nextPartition(cluster));
             stickyPartitionInfo.set(newPartitionInfo);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index a1f684ac95..983426316d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -213,6 +213,43 @@ public class RecordAccumulator {
             callbacks.setPartition(partition);
     }
 
+    /**
+     * Check if partition concurrently changed, or we need to complete previously disabled partition change.
+     *
+     * @param topic The topic
+     * @param topicInfo The topic info
+     * @param partitionInfo The built-in partitioner's partition info
+     * @param deque The partition queue
+     * @param nowMs The current time, in milliseconds
+     * @param cluster THe cluster metadata
+     * @return 'true' if partition changed and we need to get new partition info and retry,
+     *         'false' otherwise
+     */
+    private boolean partitionChanged(String topic,
+                                     TopicInfo topicInfo,
+                                     BuiltInPartitioner.StickyPartitionInfo partitionInfo,
+                                     Deque<ProducerBatch> deque, long nowMs,
+                                     Cluster cluster) {
+        if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+            log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
+                    partitionInfo.partition(), topic);
+            return true;
+        }
+
+        // We might have disabled partition switch if the queue had incomplete batches.
+        // Check if all batches are full now and switch .
+        if (allBatchesFull(deque)) {
+            topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 0, cluster, true);
+            if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+                log.trace("Completed previously disabled switch for topic {} partition {}, retrying",
+                        topic, partitionInfo.partition());
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     /**
      * Add a record to the accumulator, return the append result
      * <p>
@@ -275,14 +312,14 @@ public class RecordAccumulator {
                 Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
                 synchronized (dq) {
                     // After taking the lock, validate that the partition hasn't changed and retry.
-                    if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
-                        log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
-                                partitionInfo.partition(), topic);
+                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                         continue;
-                    }
+
                     RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
                     if (appendResult != null) {
-                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
+                        // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
+                        boolean enableSwitch = allBatchesFull(dq);
+                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                         return appendResult;
                     }
                 }
@@ -307,16 +344,16 @@ public class RecordAccumulator {
 
                 synchronized (dq) {
                     // After taking the lock, validate that the partition hasn't changed and retry.
-                    if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
-                        log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
-                                partitionInfo.partition(), topic);
+                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                         continue;
-                    }
+
                     RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                     // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                     if (appendResult.newBatchCreated)
                         buffer = null;
-                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
+                    // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
+                    boolean enableSwitch = allBatchesFull(dq);
+                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                     return appendResult;
                 }
             }
@@ -378,6 +415,15 @@ public class RecordAccumulator {
     }
 
     /**
+     * Check if all batches in the queue are full.
+     */
+    private boolean allBatchesFull(Deque<ProducerBatch> deque) {
+        // Only the last batch may be incomplete, so we just check that.
+        ProducerBatch last = deque.peekLast();
+        return last == null || last.isFull();
+    }
+
+     /**
      *  Try to append to a ProducerBatch.
      *
      *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index cf991de338..9408172004 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -1105,7 +1105,7 @@ public class RecordAccumulatorTest {
             BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
 
             long totalSize = 1024 * 1024;
-            int batchSize = 128;  // note that this is also a "sticky" limit for the partitioner
+            int batchSize = 1024;  // note that this is also a "sticky" limit for the partitioner
             RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 0);
 
             // Set up callbacks so that we know what partition is chosen.
@@ -1129,31 +1129,34 @@ public class RecordAccumulatorTest {
             assertEquals(1, mockRandom.get());
 
             // Produce large record, we should exceed "sticky" limit, but produce to this partition
-            // as we switch after the "sticky" limit is exceeded.  The partition is switched after
-            // we produce.
+            // as we try to switch after the "sticky" limit is exceeded.  The switch is disabled
+            // because of incomplete batch.
             byte[] largeValue = new byte[batchSize];
             accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
                 callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertEquals(partition1, partition.get());
-            assertEquals(2, mockRandom.get());
+            assertEquals(1, mockRandom.get());
 
-            // Produce large record, we should switch to next partition.
+            // Produce large record, we should switch to next partition as we complete
+            // previous batch and exceeded sticky limit.
             accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
                 callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertEquals(partition2, partition.get());
-            assertEquals(3, mockRandom.get());
+            assertEquals(2, mockRandom.get());
 
-            // Produce large record, we should switch to next partition.
+            // Produce large record, we should switch to next partition as we complete
+            // previous batch and exceeded sticky limit.
             accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
                 callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertEquals(partition3, partition.get());
-            assertEquals(4, mockRandom.get());
+            assertEquals(3, mockRandom.get());
 
-            // Produce large record, we should switch to first partition again.
+            // Produce large record, we should switch to next partition as we complete
+            // previous batch and exceeded sticky limit.
             accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
                 callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
             assertEquals(partition1, partition.get());
-            assertEquals(5, mockRandom.get());
+            assertEquals(4, mockRandom.get());
         } finally {
             BuiltInPartitioner.mockRandom = null;
         }
@@ -1248,6 +1251,37 @@ public class RecordAccumulatorTest {
         }
     }
 
+    @Test
+    public void testBuiltInPartitionerFractionalBatches() throws Exception {
+        // Test how we avoid creating fractional batches with high linger.ms (see
+        // BuiltInPartitioner.updatePartitionInfo).
+        long totalSize = 1024 * 1024;
+        int batchSize = 512;  // note that this is also a "sticky" limit for the partitioner
+        int valSize = 32;
+        RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 10);
+        byte[] value = new byte[valSize];
+
+        for (int c = 10; c-- > 0; ) {
+            // Produce about 2/3 of the batch size.
+            for (int recCount = batchSize * 2 / 3 / valSize; recCount-- > 0; ) {
+                accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0, null, value, Record.EMPTY_HEADERS,
+                    null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+            }
+
+            // Advance the time to make the batch ready.
+            time.sleep(10);
+
+            // We should have one batch ready.
+            Set<Node> nodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+            assertEquals(1, nodes.size(), "Should have 1 leader ready");
+            List<ProducerBatch> batches = accum.drain(cluster, nodes, Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+            assertEquals(1, batches.size(), "Should have 1 batch ready");
+            int actualBatchSize = batches.get(0).records().sizeInBytes();
+            assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater than half batch.size");
+            assertTrue(actualBatchSize < batchSize, "Batch must be less than batch.size");
+        }
+    }
+
     private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords)
         throws InterruptedException {
         Random random = new Random();