You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/04 21:35:33 UTC

[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r865302205


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -960,8 +1002,10 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer", cce);
             }
+
+            // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,
+            // which means that the RecordAccumulator would pick a partition based on broker load.

Review Comment:
   Missed, now rephrased.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##########
@@ -25,12 +23,15 @@
 
     private final Cluster cluster;
     private final Serializer<K> keySerializer;
-    private final DefaultPartitioner defaultPartitioner;
 
+    @SuppressWarnings("deprecation")
+    private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner;
+
+    @SuppressWarnings("deprecation")
     public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) {
         this.cluster = cluster;
         this.keySerializer = keySerializer;
-        this.defaultPartitioner = new DefaultPartitioner();
+        this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner();

Review Comment:
   DefaultPartitioner implements onNewBatch, but DefaultStreamPartitioner doesn't seem to ever call it (the DefaultPartitioner is a private object).  Without onNewBatch, the DefaultPartitioner.partition would return the same partition for unkeyed messages.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per topic.
+ */
+public class BuiltInPartitioner {
+    private final Logger log;
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
+        this.log = logContext.logger(BuiltInPartitioner.class);
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            // Note that partitions without leader are excluded from the partitionLoadStats.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
+            int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+            // By construction, the cumulative frequency table is sorted, so we can use binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or -(insertion_point) - 1
+            // (where insertion_point is the index of the first element greater than the key).
+            // We need to get the index of the first value that is strictly greater, which
+            // would be the insertion point, except if we found the element that's equal to
+            // the searched value (in this case we need to get next).  For example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            return partitionLoadStats.partitionIds[partitionIndex];
+        }
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the end of range for the
+     * random number.
+     */
+    public int loadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+    }
+
+    /**
+     * Peek currently chosen sticky partition.  This method works in conjunction with {@link #isPartitionChanged}
+     * and {@link #updatePartitionInfo}.  The workflow is the following:
+     *
+     * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+     * 2. Lock partition's batch queue.
+     * 3. isPartitionChanged under lock to make sure that nobody raced us.
+     * 4. Append data to buffer.
+     * 5. updatePartitionInfo to update produced bytes and maybe switch partition.
+     *
+     *  It's important that steps 3-5 are under partition's batch queue lock.
+     *
+     * @param cluster The cluster information (needed if there is no current partition)
+     * @return sticky partition info object
+     */
+    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+        if (partitionInfo != null)
+            return partitionInfo;
+
+        // We're the first to create it.
+        int partition = nextPartition(cluster);
+        log.trace("Switching to partition {} in topic {}", partition, topic);
+        partitionInfo = new StickyPartitionInfo(partition);
+        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+            return partitionInfo;
+
+        // Someone has raced us.
+        return stickyPartitionInfo.get();
+    }
+
+    /**
+     * Check if partition is changed by a concurrent thread.  NOTE this function needs to be called under
+     * the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+     * @return true if sticky partition object is changed (race condition)
+     */
+    boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+        // partitionInfo may be null if the caller didn't use built-in partitioner.
+        return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo;
+    }
+
+    /**
+     * Update partition info with the number of bytes appended and maybe switch partition.
+     * NOTE this function needs to be called under the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+     * @param appendedBytes The number of bytes appended to this partition
+     * @param cluster The cluster information
+     */
+    void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) {
+        // partitionInfo may be null if the caller didn't use built-in partitioner.
+        if (partitionInfo == null)
+            return;
+
+        assert partitionInfo == stickyPartitionInfo.get();
+        int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes);
+        if (producedBytes >= stickyBatchSize) {
+            // We've produced enough to this partition, switch to next.
+            int partition = nextPartition(cluster);
+            log.trace("Switching to partition {} in topic {}", partition, topic);
+            StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(partition);
+            stickyPartitionInfo.set(newPartitionInfo);
+        }
+    }
+
+    /**
+     * Update partition load stats from the queue sizes of each partition.

Review Comment:
   Added comments in params.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per topic.
+ */
+public class BuiltInPartitioner {
+    private final Logger log;
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
+        this.log = logContext.logger(BuiltInPartitioner.class);
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            // Note that partitions without leader are excluded from the partitionLoadStats.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
+            int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+            // By construction, the cumulative frequency table is sorted, so we can use binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or -(insertion_point) - 1
+            // (where insertion_point is the index of the first element greater than the key).
+            // We need to get the index of the first value that is strictly greater, which
+            // would be the insertion point, except if we found the element that's equal to
+            // the searched value (in this case we need to get next).  For example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            return partitionLoadStats.partitionIds[partitionIndex];
+        }
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the end of range for the
+     * random number.
+     */
+    public int loadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+    }
+
+    /**
+     * Peek currently chosen sticky partition.  This method works in conjunction with {@link #isPartitionChanged}
+     * and {@link #updatePartitionInfo}.  The workflow is the following:
+     *
+     * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+     * 2. Lock partition's batch queue.
+     * 3. isPartitionChanged under lock to make sure that nobody raced us.
+     * 4. Append data to buffer.
+     * 5. updatePartitionInfo to update produced bytes and maybe switch partition.
+     *
+     *  It's important that steps 3-5 are under partition's batch queue lock.
+     *
+     * @param cluster The cluster information (needed if there is no current partition)
+     * @return sticky partition info object
+     */
+    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+        if (partitionInfo != null)
+            return partitionInfo;
+
+        // We're the first to create it.
+        int partition = nextPartition(cluster);
+        log.trace("Switching to partition {} in topic {}", partition, topic);
+        partitionInfo = new StickyPartitionInfo(partition);
+        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
+            return partitionInfo;
+
+        // Someone has raced us.
+        return stickyPartitionInfo.get();
+    }
+
+    /**
+     * Check if partition is changed by a concurrent thread.  NOTE this function needs to be called under
+     * the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+     * @return true if sticky partition object is changed (race condition)
+     */
+    boolean isPartitionChanged(StickyPartitionInfo partitionInfo) {
+        // partitionInfo may be null if the caller didn't use built-in partitioner.
+        return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo;
+    }
+
+    /**
+     * Update partition info with the number of bytes appended and maybe switch partition.
+     * NOTE this function needs to be called under the partition's batch queue lock.
+     *
+     * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo
+     * @param appendedBytes The number of bytes appended to this partition
+     * @param cluster The cluster information
+     */
+    void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) {
+        // partitionInfo may be null if the caller didn't use built-in partitioner.
+        if (partitionInfo == null)
+            return;
+
+        assert partitionInfo == stickyPartitionInfo.get();
+        int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes);
+        if (producedBytes >= stickyBatchSize) {
+            // We've produced enough to this partition, switch to next.
+            int partition = nextPartition(cluster);
+            log.trace("Switching to partition {} in topic {}", partition, topic);
+            StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(partition);
+            stickyPartitionInfo.set(newPartitionInfo);
+        }
+    }
+
+    /**
+     * Update partition load stats from the queue sizes of each partition.
+     * NOTE: queueSizes are modified in place to avoid allocations
+     *
+     * @param queueSizes The queue sizes
+     * @param partitionIds The partition ids for the queues
+     * @param length The logical length of the arrays (could be less): we may eliminate some partitions
+     *               based on latency, but to avoid reallocation of the arrays, we just decrement
+     *               logical length
+     * Visible for testing
+     */
+    public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int length) {
+        if (queueSizes == null) {
+            log.trace("No load stats for topic {}, not using adaptive", topic);
+            partitionLoadStats = null;
+            return;
+        }
+        assert queueSizes.length == partitionIds.length;
+        assert length <= queueSizes.length;
+
+        // The queueSizes.length represents the number of all partitions in the topic and if we have
+        // less than 2 partitions, there is no need to do adaptive logic.
+        // If partitioner.availability.timeout.ms != 0, then partitions that experience high latencies
+        // (greater than partitioner.availability.timeout.ms) may be excluded, the length represents
+        // partitions that are not excluded.  If some partitions were excluded, we'd still want to
+        // go through adaptive logic, even if we have one partition.
+        // See also RecordAccumulator#partitionReady where the queueSizes are built.
+        if (length < 1 || queueSizes.length < 2) {
+            log.trace("The number of partitions is too small: {} and {}, not using adaptive for topic {}",

Review Comment:
   Added.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per topic.
+ */
+public class BuiltInPartitioner {
+    private final Logger log;
+    private final String topic;
+    private final int stickyBatchSize;
+
+    private volatile PartitionLoadStats partitionLoadStats = null;
+    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
+
+    // Visible and used for testing only.
+    static volatile public Supplier<Integer> mockRandom = null;
+
+    /**
+     * BuiltInPartitioner constructor.
+     *
+     * @param topic The topic
+     * @param stickyBatchSize How much to produce to partition before switch
+     */
+    public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
+        this.log = logContext.logger(BuiltInPartitioner.class);
+        this.topic = topic;
+        this.stickyBatchSize = stickyBatchSize;
+    }
+
+    /**
+     * Calculate the next partition for the topic based on the partition load stats.
+     */
+    private int nextPartition(Cluster cluster) {
+        int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+        // Cache volatile variable in local variable.
+        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+        if (partitionLoadStats == null) {
+            // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
+            // partition based on uniform distribution.
+            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0)
+                return availablePartitions.get(random % availablePartitions.size()).partition();
+
+            // We don't have available partitions, just pick one among all partitions.
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+            return random % partitions.size();
+        } else {
+            // Calculate next partition based on load distribution.
+            // Note that partitions without leader are excluded from the partitionLoadStats.
+            assert partitionLoadStats.length > 0;
+
+            int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
+            int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+            // By construction, the cumulative frequency table is sorted, so we can use binary
+            // search to find the desired index.
+            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
+
+            // binarySearch results the index of the found element, or -(insertion_point) - 1
+            // (where insertion_point is the index of the first element greater than the key).
+            // We need to get the index of the first value that is strictly greater, which
+            // would be the insertion point, except if we found the element that's equal to
+            // the searched value (in this case we need to get next).  For example, if we have
+            //  4 5 8
+            // and we're looking for 3, then we'd get the insertion_point = 0, and the function
+            // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're looking for 4, we'd
+            // get 0, and we need the next one, so adding 1 works here as well.
+            int partitionIndex = Math.abs(searchResult + 1);
+            assert partitionIndex < partitionLoadStats.length;
+            return partitionLoadStats.partitionIds[partitionIndex];
+        }
+    }
+
+    /**
+     * Test-only function.  When partition load stats are defined, return the end of range for the
+     * random number.
+     */
+    public int loadStatsRangeEnd() {
+        assert partitionLoadStats != null;
+        assert partitionLoadStats.length > 0;
+        return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
+    }
+
+    /**
+     * Peek currently chosen sticky partition.  This method works in conjunction with {@link #isPartitionChanged}
+     * and {@link #updatePartitionInfo}.  The workflow is the following:
+     *
+     * 1. peekCurrentPartitionInfo is called to know which partition to lock.
+     * 2. Lock partition's batch queue.
+     * 3. isPartitionChanged under lock to make sure that nobody raced us.
+     * 4. Append data to buffer.
+     * 5. updatePartitionInfo to update produced bytes and maybe switch partition.
+     *
+     *  It's important that steps 3-5 are under partition's batch queue lock.
+     *
+     * @param cluster The cluster information (needed if there is no current partition)
+     * @return sticky partition info object
+     */
+    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
+        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
+        if (partitionInfo != null)
+            return partitionInfo;
+
+        // We're the first to create it.
+        int partition = nextPartition(cluster);
+        log.trace("Switching to partition {} in topic {}", partition, topic);

Review Comment:
   Done.



##########
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java:
##########
@@ -117,10 +116,24 @@ public MockProducer(final Cluster cluster,
      *
      * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
      */
+    @SuppressWarnings("deprecation")
     public MockProducer(final boolean autoComplete,
                         final Serializer<K> keySerializer,
                         final Serializer<V> valueSerializer) {
-        this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
+        this(Cluster.empty(), autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);

Review Comment:
   DefaultPartitioner implements onNewBatch, but MockProducer doesn't call it.  It only calls .partition, which for unkeyed messages returns the same partition until the onNewBatch is called.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1017,6 +1024,158 @@ public void testStickyBatches() throws Exception {
         assertEquals(appends, 2 * expectedAppends);
     }
 
+    @Test
+    public void testUniformBuiltInPartitioner() throws Exception {
+
+        try {
+            // Mock random number generator with just sequential integer.
+            AtomicInteger mockRandom = new AtomicInteger();
+            BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);
+
+            long totalSize = 1024 * 1024;
+            int batchSize = 128;  // note that this is also a "sticky" limit for the partitioner
+            RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 0);
+
+            // Set up callbacks so that we know what partition is chosen.
+            final int[] partition = {RecordMetadata.UNKNOWN_PARTITION};

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org