You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/09/15 00:44:58 UTC
[kafka] branch 3.3 updated: KAFKA-14156: Built-in partitioner may create suboptimal batches (#12570)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 3b080a26fc KAFKA-14156: Built-in partitioner may create suboptimal batches (#12570)
3b080a26fc is described below
commit 3b080a26fc9a612601a28a14c6ae809ae625220f
Author: Artem Livshits <84...@users.noreply.github.com>
AuthorDate: Wed Sep 14 17:39:14 2022 -0700
KAFKA-14156: Built-in partitioner may create suboptimal batches (#12570)
Now the built-in partitioner defers partition switch (while still
accounting produced bytes) if there is no ready batch to send, thus
avoiding switching partitions and creating fractional batches.
Reviewers: Jun Rao <ju...@confluent.io>
---
.../producer/internals/BuiltInPartitioner.java | 41 +++++++++++++-
.../producer/internals/RecordAccumulator.java | 66 ++++++++++++++++++----
.../producer/internals/RecordAccumulatorTest.java | 54 ++++++++++++++----
3 files changed, 140 insertions(+), 21 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
index a5805df56b..1f46dbb2fc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
@@ -170,13 +170,52 @@ public class BuiltInPartitioner {
* @param cluster The cluster information
*/
void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) {
+ updatePartitionInfo(partitionInfo, appendedBytes, cluster, true);
+ }
+
+ /**
+ * Update partition info with the number of bytes appended and maybe switch partition.
+ * NOTE this function needs to be called under the partition's batch queue lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+ * @param appendedBytes The number of bytes appended to this partition
+ * @param cluster The cluster information
+ * @param enableSwitch If true, switch partition once produced enough bytes
+ */
+ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster, boolean enableSwitch) {
// partitionInfo may be null if the caller didn't use built-in partitioner.
if (partitionInfo == null)
return;
assert partitionInfo == stickyPartitionInfo.get();
int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes);
- if (producedBytes >= stickyBatchSize) {
+
+ // We're trying to switch partition once we produce stickyBatchSize bytes to a partition
+ // but doing so may hinder batching because partition switch may happen while batch isn't
+ // ready to send. This situation is especially likely with high linger.ms setting.
+ // Consider the following example:
+ // linger.ms=500, producer produces 12KB in 500ms, batch.size=16KB
+ // - first batch collects 12KB in 500ms, gets sent
+ // - second batch collects 4KB, then we switch partition, so 4KB gets eventually sent
+ // - ... and so on - we'd get 12KB and 4KB batches
+ // To get more optimal batching and avoid 4KB fractional batches, the caller may disallow
+ // partition switch if batch is not ready to send, so with the example above we'd avoid
+ // fractional 4KB batches: in that case the scenario would look like this:
+ // - first batch collects 12KB in 500ms, gets sent
+ // - second batch collects 4KB, but partition switch doesn't happen because batch in not ready
+ // - second batch collects 12KB in 500ms, gets sent and now we switch partition.
+ // - ... and so on - we'd just send 12KB batches
+ // We cap the produced bytes to not exceed 2x of the batch size to avoid pathological cases
+ // (e.g. if we have a mix of keyed and unkeyed messages, key messages may create an
+ // unready batch after the batch that disabled partition switch becomes ready).
+ // As a result, with high latency.ms setting we end up switching partitions after producing
+ // between stickyBatchSize and stickyBatchSize * 2 bytes, to better align with batch boundary.
+ if (producedBytes >= stickyBatchSize * 2) {
+ log.trace("Produced {} bytes, exceeding twice the batch size of {} bytes, with switching set to {}",
+ producedBytes, stickyBatchSize, enableSwitch);
+ }
+
+ if (producedBytes >= stickyBatchSize && enableSwitch || producedBytes >= stickyBatchSize * 2) {
// We've produced enough to this partition, switch to next.
StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(nextPartition(cluster));
stickyPartitionInfo.set(newPartitionInfo);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index a1f684ac95..983426316d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -213,6 +213,43 @@ public class RecordAccumulator {
callbacks.setPartition(partition);
}
+ /**
+ * Check if partition concurrently changed, or we need to complete previously disabled partition change.
+ *
+ * @param topic The topic
+ * @param topicInfo The topic info
+ * @param partitionInfo The built-in partitioner's partition info
+ * @param deque The partition queue
+ * @param nowMs The current time, in milliseconds
+ * @param cluster THe cluster metadata
+ * @return 'true' if partition changed and we need to get new partition info and retry,
+ * 'false' otherwise
+ */
+ private boolean partitionChanged(String topic,
+ TopicInfo topicInfo,
+ BuiltInPartitioner.StickyPartitionInfo partitionInfo,
+ Deque<ProducerBatch> deque, long nowMs,
+ Cluster cluster) {
+ if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+ log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
+ partitionInfo.partition(), topic);
+ return true;
+ }
+
+ // We might have disabled partition switch if the queue had incomplete batches.
+ // Check if all batches are full now and switch .
+ if (allBatchesFull(deque)) {
+ topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 0, cluster, true);
+ if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
+ log.trace("Completed previously disabled switch for topic {} partition {}, retrying",
+ topic, partitionInfo.partition());
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/**
* Add a record to the accumulator, return the append result
* <p>
@@ -275,14 +312,14 @@ public class RecordAccumulator {
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
- if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
- log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
- partitionInfo.partition(), topic);
+ if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
- }
+
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
- topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
+ // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
+ boolean enableSwitch = allBatchesFull(dq);
+ topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
@@ -307,16 +344,16 @@ public class RecordAccumulator {
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
- if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
- log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
- partitionInfo.partition(), topic);
+ if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
- }
+
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
- topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
+ // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
+ boolean enableSwitch = allBatchesFull(dq);
+ topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
@@ -378,6 +415,15 @@ public class RecordAccumulator {
}
/**
+ * Check if all batches in the queue are full.
+ */
+ private boolean allBatchesFull(Deque<ProducerBatch> deque) {
+ // Only the last batch may be incomplete, so we just check that.
+ ProducerBatch last = deque.peekLast();
+ return last == null || last.isFull();
+ }
+
+ /**
* Try to append to a ProducerBatch.
*
* If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index cf991de338..9408172004 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -1105,7 +1105,7 @@ public class RecordAccumulatorTest {
BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
long totalSize = 1024 * 1024;
- int batchSize = 128; // note that this is also a "sticky" limit for the partitioner
+ int batchSize = 1024; // note that this is also a "sticky" limit for the partitioner
RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 0);
// Set up callbacks so that we know what partition is chosen.
@@ -1129,31 +1129,34 @@ public class RecordAccumulatorTest {
assertEquals(1, mockRandom.get());
// Produce large record, we should exceed "sticky" limit, but produce to this partition
- // as we switch after the "sticky" limit is exceeded. The partition is switched after
- // we produce.
+ // as we try to switch after the "sticky" limit is exceeded. The switch is disabled
+ // because of incomplete batch.
byte[] largeValue = new byte[batchSize];
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
assertEquals(partition1, partition.get());
- assertEquals(2, mockRandom.get());
+ assertEquals(1, mockRandom.get());
- // Produce large record, we should switch to next partition.
+ // Produce large record, we should switch to next partition as we complete
+ // previous batch and exceeded sticky limit.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
assertEquals(partition2, partition.get());
- assertEquals(3, mockRandom.get());
+ assertEquals(2, mockRandom.get());
- // Produce large record, we should switch to next partition.
+ // Produce large record, we should switch to next partition as we complete
+ // previous batch and exceeded sticky limit.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
assertEquals(partition3, partition.get());
- assertEquals(4, mockRandom.get());
+ assertEquals(3, mockRandom.get());
- // Produce large record, we should switch to first partition again.
+ // Produce large record, we should switch to next partition as we complete
+ // previous batch and exceeded sticky limit.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster);
assertEquals(partition1, partition.get());
- assertEquals(5, mockRandom.get());
+ assertEquals(4, mockRandom.get());
} finally {
BuiltInPartitioner.mockRandom = null;
}
@@ -1248,6 +1251,37 @@ public class RecordAccumulatorTest {
}
}
+ @Test
+ public void testBuiltInPartitionerFractionalBatches() throws Exception {
+ // Test how we avoid creating fractional batches with high linger.ms (see
+ // BuiltInPartitioner.updatePartitionInfo).
+ long totalSize = 1024 * 1024;
+ int batchSize = 512; // note that this is also a "sticky" limit for the partitioner
+ int valSize = 32;
+ RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 10);
+ byte[] value = new byte[valSize];
+
+ for (int c = 10; c-- > 0; ) {
+ // Produce about 2/3 of the batch size.
+ for (int recCount = batchSize * 2 / 3 / valSize; recCount-- > 0; ) {
+ accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0, null, value, Record.EMPTY_HEADERS,
+ null, maxBlockTimeMs, false, time.milliseconds(), cluster);
+ }
+
+ // Advance the time to make the batch ready.
+ time.sleep(10);
+
+ // We should have one batch ready.
+ Set<Node> nodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ assertEquals(1, nodes.size(), "Should have 1 leader ready");
+ List<ProducerBatch> batches = accum.drain(cluster, nodes, Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+ assertEquals(1, batches.size(), "Should have 1 batch ready");
+ int actualBatchSize = batches.get(0).records().sizeInBytes();
+ assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater than half batch.size");
+ assertTrue(actualBatchSize < batchSize, "Batch must be less than batch.size");
+ }
+ }
+
private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords)
throws InterruptedException {
Random random = new Random();